diff options
Diffstat (limited to 'yt_dlp/downloader/fragment.py')
-rw-r--r-- | yt_dlp/downloader/fragment.py | 47 |
1 files changed, 37 insertions, 10 deletions
diff --git a/yt_dlp/downloader/fragment.py b/yt_dlp/downloader/fragment.py index 04b0f68c0..79c6561c7 100644 --- a/yt_dlp/downloader/fragment.py +++ b/yt_dlp/downloader/fragment.py @@ -1,9 +1,10 @@ from __future__ import division, unicode_literals +import http.client +import json +import math import os import time -import json -from math import ceil try: import concurrent.futures @@ -15,6 +16,7 @@ from .common import FileDownloader from .http import HttpFD from ..aes import aes_cbc_decrypt_bytes from ..compat import ( + compat_os_name, compat_urllib_error, compat_struct_pack, ) @@ -90,7 +92,7 @@ class FragmentFD(FileDownloader): self._start_frag_download(ctx, info_dict) def __do_ytdl_file(self, ctx): - return not ctx['live'] and not ctx['tmpfilename'] == '-' and not self.params.get('_no_ytdl_file') + return ctx['live'] is not True and ctx['tmpfilename'] != '-' and not self.params.get('_no_ytdl_file') def _read_ytdl_file(self, ctx): assert 'ytdl_corrupt' not in ctx @@ -375,17 +377,20 @@ class FragmentFD(FileDownloader): @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ... all args must be either tuple or list ''' + interrupt_trigger = [True] max_progress = len(args) if max_progress == 1: return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func) - max_workers = self.params.get('concurrent_fragment_downloads', max_progress) + max_workers = self.params.get('concurrent_fragment_downloads', 1) if max_progress > 1: self._prepare_multiline_status(max_progress) def thread_func(idx, ctx, fragments, info_dict, tpe): ctx['max_progress'] = max_progress ctx['progress_idx'] = idx - return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe) + return self.download_and_append_fragments( + ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, + tpe=tpe, interrupt_trigger=interrupt_trigger) class FTPE(concurrent.futures.ThreadPoolExecutor): # has to stop this or it's going to wait on the worker thread itself @@ -393,8 +398,11 @@ class FragmentFD(FileDownloader): pass spins = [] + if compat_os_name == 'nt': + self.report_warning('Ctrl+C does not work on Windows when used with parallel threads. ' + 'This is a known issue and patches are welcome') for idx, (ctx, fragments, info_dict) in enumerate(args): - tpe = FTPE(ceil(max_workers / max_progress)) + tpe = FTPE(math.ceil(max_workers / max_progress)) job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe) spins.append((tpe, job)) @@ -402,18 +410,32 @@ class FragmentFD(FileDownloader): for tpe, job in spins: try: result = result and job.result() + except KeyboardInterrupt: + interrupt_trigger[0] = False finally: tpe.shutdown(wait=True) + if not interrupt_trigger[0]: + raise KeyboardInterrupt() return result - def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None): + def download_and_append_fragments( + self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, + tpe=None, interrupt_trigger=None): + if not interrupt_trigger: + interrupt_trigger = (True, ) + fragment_retries = self.params.get('fragment_retries', 0) - is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True) + is_fatal = ( + ((lambda _: False) if info_dict.get('is_live') else (lambda idx: idx == 0)) + if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)) + if not pack_func: pack_func = lambda frag_content, _: frag_content def download_fragment(fragment, ctx): frag_index = ctx['fragment_index'] = fragment['frag_index'] + if not interrupt_trigger[0]: + return False, frag_index headers = info_dict.get('http_headers', {}).copy() byte_range = fragment.get('byte_range') if byte_range: @@ -428,7 +450,7 @@ class FragmentFD(FileDownloader): if not success: return False, frag_index break - except compat_urllib_error.HTTPError as err: + except (compat_urllib_error.HTTPError, http.client.IncompleteRead) as err: # Unavailable (possibly temporary) fragments may be served. # First we try to retry then either skip or abort. # See https://github.com/ytdl-org/youtube-dl/issues/10165, @@ -466,7 +488,8 @@ class FragmentFD(FileDownloader): decrypt_fragment = self.decrypter(info_dict) - max_workers = self.params.get('concurrent_fragment_downloads', 1) + max_workers = math.ceil( + self.params.get('concurrent_fragment_downloads', 1) / ctx.get('max_progress', 1)) if can_threaded_download and max_workers > 1: def _download_fragment(fragment): @@ -477,6 +500,8 @@ class FragmentFD(FileDownloader): self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome') with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool: for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments): + if not interrupt_trigger[0]: + break ctx['fragment_filename_sanitized'] = frag_filename ctx['fragment_index'] = frag_index result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx) @@ -484,6 +509,8 @@ class FragmentFD(FileDownloader): return False else: for fragment in fragments: + if not interrupt_trigger[0]: + break frag_content, frag_index = download_fragment(fragment, ctx) result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx) if not result: |