diff options
Diffstat (limited to 'yt_dlp/downloader/fragment.py')
-rw-r--r-- | yt_dlp/downloader/fragment.py | 39 |
1 files changed, 29 insertions, 10 deletions
diff --git a/yt_dlp/downloader/fragment.py b/yt_dlp/downloader/fragment.py index 19c0990d3..7b213cd5f 100644 --- a/yt_dlp/downloader/fragment.py +++ b/yt_dlp/downloader/fragment.py @@ -25,6 +25,7 @@ from ..utils import ( error_to_compat_str, encodeFilename, sanitized_Request, + traverse_obj, ) @@ -382,6 +383,7 @@ class FragmentFD(FileDownloader): max_workers = self.params.get('concurrent_fragment_downloads', 1) if max_progress > 1: self._prepare_multiline_status(max_progress) + is_live = any(traverse_obj(args, (..., 2, 'is_live'), default=[])) def thread_func(idx, ctx, fragments, info_dict, tpe): ctx['max_progress'] = max_progress @@ -395,25 +397,43 @@ class FragmentFD(FileDownloader): def __exit__(self, exc_type, exc_val, exc_tb): pass - spins = [] if compat_os_name == 'nt': - self.report_warning('Ctrl+C does not work on Windows when used with parallel threads. ' - 'This is a known issue and patches are welcome') + def bindoj_result(future): + while True: + try: + return future.result(0.1) + except KeyboardInterrupt: + raise + except concurrent.futures.TimeoutError: + continue + else: + def bindoj_result(future): + return future.result() + + def interrupt_trigger_iter(fg): + for f in fg: + if not interrupt_trigger[0]: + break + yield f + + spins = [] for idx, (ctx, fragments, info_dict) in enumerate(args): tpe = FTPE(math.ceil(max_workers / max_progress)) - job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe) + job = tpe.submit(thread_func, idx, ctx, interrupt_trigger_iter(fragments), info_dict, tpe) spins.append((tpe, job)) result = True for tpe, job in spins: try: - result = result and job.result() + result = result and bindoj_result(job) except KeyboardInterrupt: interrupt_trigger[0] = False finally: tpe.shutdown(wait=True) - if not interrupt_trigger[0]: + if not interrupt_trigger[0] and not is_live: raise KeyboardInterrupt() + # we expect the user wants to stop and DO WANT the preceding postprocessors to run; + # so returning a intermediate result here instead of KeyboardInterrupt on live return result def download_and_append_fragments( @@ -431,10 +451,11 @@ class FragmentFD(FileDownloader): pack_func = lambda frag_content, _: frag_content def download_fragment(fragment, ctx): + if not interrupt_trigger[0]: + return False, fragment['frag_index'] + frag_index = ctx['fragment_index'] = fragment['frag_index'] ctx['last_error'] = None - if not interrupt_trigger[0]: - return False, frag_index headers = info_dict.get('http_headers', {}).copy() byte_range = fragment.get('byte_range') if byte_range: @@ -500,8 +521,6 @@ class FragmentFD(FileDownloader): self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome') with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool: for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments): - if not interrupt_trigger[0]: - break ctx['fragment_filename_sanitized'] = frag_filename ctx['fragment_index'] = frag_index result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx) |