aboutsummaryrefslogtreecommitdiffstats
path: root/yt_dlp/downloader/fragment.py
diff options
context:
space:
mode:
Diffstat (limited to 'yt_dlp/downloader/fragment.py')
-rw-r--r--yt_dlp/downloader/fragment.py47
1 files changed, 37 insertions, 10 deletions
diff --git a/yt_dlp/downloader/fragment.py b/yt_dlp/downloader/fragment.py
index 04b0f68c0..79c6561c7 100644
--- a/yt_dlp/downloader/fragment.py
+++ b/yt_dlp/downloader/fragment.py
@@ -1,9 +1,10 @@
from __future__ import division, unicode_literals
+import http.client
+import json
+import math
import os
import time
-import json
-from math import ceil
try:
import concurrent.futures
@@ -15,6 +16,7 @@ from .common import FileDownloader
from .http import HttpFD
from ..aes import aes_cbc_decrypt_bytes
from ..compat import (
+ compat_os_name,
compat_urllib_error,
compat_struct_pack,
)
@@ -90,7 +92,7 @@ class FragmentFD(FileDownloader):
self._start_frag_download(ctx, info_dict)
def __do_ytdl_file(self, ctx):
- return not ctx['live'] and not ctx['tmpfilename'] == '-' and not self.params.get('_no_ytdl_file')
+ return ctx['live'] is not True and ctx['tmpfilename'] != '-' and not self.params.get('_no_ytdl_file')
def _read_ytdl_file(self, ctx):
assert 'ytdl_corrupt' not in ctx
@@ -375,17 +377,20 @@ class FragmentFD(FileDownloader):
@params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
all args must be either tuple or list
'''
+ interrupt_trigger = [True]
max_progress = len(args)
if max_progress == 1:
return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func)
- max_workers = self.params.get('concurrent_fragment_downloads', max_progress)
+ max_workers = self.params.get('concurrent_fragment_downloads', 1)
if max_progress > 1:
self._prepare_multiline_status(max_progress)
def thread_func(idx, ctx, fragments, info_dict, tpe):
ctx['max_progress'] = max_progress
ctx['progress_idx'] = idx
- return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe)
+ return self.download_and_append_fragments(
+ ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func,
+ tpe=tpe, interrupt_trigger=interrupt_trigger)
class FTPE(concurrent.futures.ThreadPoolExecutor):
# has to stop this or it's going to wait on the worker thread itself
@@ -393,8 +398,11 @@ class FragmentFD(FileDownloader):
pass
spins = []
+ if compat_os_name == 'nt':
+ self.report_warning('Ctrl+C does not work on Windows when used with parallel threads. '
+ 'This is a known issue and patches are welcome')
for idx, (ctx, fragments, info_dict) in enumerate(args):
- tpe = FTPE(ceil(max_workers / max_progress))
+ tpe = FTPE(math.ceil(max_workers / max_progress))
job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe)
spins.append((tpe, job))
@@ -402,18 +410,32 @@ class FragmentFD(FileDownloader):
for tpe, job in spins:
try:
result = result and job.result()
+ except KeyboardInterrupt:
+ interrupt_trigger[0] = False
finally:
tpe.shutdown(wait=True)
+ if not interrupt_trigger[0]:
+ raise KeyboardInterrupt()
return result
- def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None):
+ def download_and_append_fragments(
+ self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None,
+ tpe=None, interrupt_trigger=None):
+ if not interrupt_trigger:
+ interrupt_trigger = (True, )
+
fragment_retries = self.params.get('fragment_retries', 0)
- is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)
+ is_fatal = (
+ ((lambda _: False) if info_dict.get('is_live') else (lambda idx: idx == 0))
+ if self.params.get('skip_unavailable_fragments', True) else (lambda _: True))
+
if not pack_func:
pack_func = lambda frag_content, _: frag_content
def download_fragment(fragment, ctx):
frag_index = ctx['fragment_index'] = fragment['frag_index']
+ if not interrupt_trigger[0]:
+ return False, frag_index
headers = info_dict.get('http_headers', {}).copy()
byte_range = fragment.get('byte_range')
if byte_range:
@@ -428,7 +450,7 @@ class FragmentFD(FileDownloader):
if not success:
return False, frag_index
break
- except compat_urllib_error.HTTPError as err:
+ except (compat_urllib_error.HTTPError, http.client.IncompleteRead) as err:
# Unavailable (possibly temporary) fragments may be served.
# First we try to retry then either skip or abort.
# See https://github.com/ytdl-org/youtube-dl/issues/10165,
@@ -466,7 +488,8 @@ class FragmentFD(FileDownloader):
decrypt_fragment = self.decrypter(info_dict)
- max_workers = self.params.get('concurrent_fragment_downloads', 1)
+ max_workers = math.ceil(
+ self.params.get('concurrent_fragment_downloads', 1) / ctx.get('max_progress', 1))
if can_threaded_download and max_workers > 1:
def _download_fragment(fragment):
@@ -477,6 +500,8 @@ class FragmentFD(FileDownloader):
self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments):
+ if not interrupt_trigger[0]:
+ break
ctx['fragment_filename_sanitized'] = frag_filename
ctx['fragment_index'] = frag_index
result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
@@ -484,6 +509,8 @@ class FragmentFD(FileDownloader):
return False
else:
for fragment in fragments:
+ if not interrupt_trigger[0]:
+ break
frag_content, frag_index = download_fragment(fragment, ctx)
result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
if not result: