aboutsummaryrefslogtreecommitdiffstats
path: root/yt_dlp/downloader/fragment.py
diff options
context:
space:
mode:
Diffstat (limited to 'yt_dlp/downloader/fragment.py')
-rw-r--r--yt_dlp/downloader/fragment.py39
1 files changed, 29 insertions, 10 deletions
diff --git a/yt_dlp/downloader/fragment.py b/yt_dlp/downloader/fragment.py
index 19c0990d3..7b213cd5f 100644
--- a/yt_dlp/downloader/fragment.py
+++ b/yt_dlp/downloader/fragment.py
@@ -25,6 +25,7 @@ from ..utils import (
error_to_compat_str,
encodeFilename,
sanitized_Request,
+ traverse_obj,
)
@@ -382,6 +383,7 @@ class FragmentFD(FileDownloader):
max_workers = self.params.get('concurrent_fragment_downloads', 1)
if max_progress > 1:
self._prepare_multiline_status(max_progress)
+ is_live = any(traverse_obj(args, (..., 2, 'is_live'), default=[]))
def thread_func(idx, ctx, fragments, info_dict, tpe):
ctx['max_progress'] = max_progress
@@ -395,25 +397,43 @@ class FragmentFD(FileDownloader):
def __exit__(self, exc_type, exc_val, exc_tb):
pass
- spins = []
if compat_os_name == 'nt':
- self.report_warning('Ctrl+C does not work on Windows when used with parallel threads. '
- 'This is a known issue and patches are welcome')
+ def bindoj_result(future):
+ while True:
+ try:
+ return future.result(0.1)
+ except KeyboardInterrupt:
+ raise
+ except concurrent.futures.TimeoutError:
+ continue
+ else:
+ def bindoj_result(future):
+ return future.result()
+
+ def interrupt_trigger_iter(fg):
+ for f in fg:
+ if not interrupt_trigger[0]:
+ break
+ yield f
+
+ spins = []
for idx, (ctx, fragments, info_dict) in enumerate(args):
tpe = FTPE(math.ceil(max_workers / max_progress))
- job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe)
+ job = tpe.submit(thread_func, idx, ctx, interrupt_trigger_iter(fragments), info_dict, tpe)
spins.append((tpe, job))
result = True
for tpe, job in spins:
try:
- result = result and job.result()
+ result = result and bindoj_result(job)
except KeyboardInterrupt:
interrupt_trigger[0] = False
finally:
tpe.shutdown(wait=True)
- if not interrupt_trigger[0]:
+ if not interrupt_trigger[0] and not is_live:
raise KeyboardInterrupt()
+ # we expect the user wants to stop and DO WANT the preceding postprocessors to run;
+ # so returning a intermediate result here instead of KeyboardInterrupt on live
return result
def download_and_append_fragments(
@@ -431,10 +451,11 @@ class FragmentFD(FileDownloader):
pack_func = lambda frag_content, _: frag_content
def download_fragment(fragment, ctx):
+ if not interrupt_trigger[0]:
+ return False, fragment['frag_index']
+
frag_index = ctx['fragment_index'] = fragment['frag_index']
ctx['last_error'] = None
- if not interrupt_trigger[0]:
- return False, frag_index
headers = info_dict.get('http_headers', {}).copy()
byte_range = fragment.get('byte_range')
if byte_range:
@@ -500,8 +521,6 @@ class FragmentFD(FileDownloader):
self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments):
- if not interrupt_trigger[0]:
- break
ctx['fragment_filename_sanitized'] = frag_filename
ctx['fragment_index'] = frag_index
result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)