diff options
Diffstat (limited to 'yt_dlp/downloader/fragment.py')
-rw-r--r-- | yt_dlp/downloader/fragment.py | 53 |
1 files changed, 51 insertions, 2 deletions
diff --git a/yt_dlp/downloader/fragment.py b/yt_dlp/downloader/fragment.py index ebdef27db..31f946792 100644 --- a/yt_dlp/downloader/fragment.py +++ b/yt_dlp/downloader/fragment.py @@ -3,6 +3,7 @@ from __future__ import division, unicode_literals import os import time import json +from math import ceil try: import concurrent.futures @@ -120,6 +121,7 @@ class FragmentFD(FileDownloader): 'url': frag_url, 'http_headers': headers or info_dict.get('http_headers'), 'request_data': request_data, + 'ctx_id': ctx.get('ctx_id'), } success = ctx['dl'].download(fragment_filename, fragment_info_dict) if not success: @@ -219,6 +221,7 @@ class FragmentFD(FileDownloader): def _start_frag_download(self, ctx, info_dict): resume_len = ctx['complete_frags_downloaded_bytes'] total_frags = ctx['total_frags'] + ctx_id = ctx.get('ctx_id') # This dict stores the download progress, it's updated by the progress # hook state = { @@ -242,6 +245,12 @@ class FragmentFD(FileDownloader): if s['status'] not in ('downloading', 'finished'): return + if ctx_id is not None and s.get('ctx_id') != ctx_id: + return + + state['max_progress'] = ctx.get('max_progress') + state['progress_idx'] = ctx.get('progress_idx') + time_now = time.time() state['elapsed'] = time_now - start frag_total_bytes = s.get('total_bytes') or 0 @@ -301,6 +310,9 @@ class FragmentFD(FileDownloader): 'filename': ctx['filename'], 'status': 'finished', 'elapsed': elapsed, + 'ctx_id': ctx.get('ctx_id'), + 'max_progress': ctx.get('max_progress'), + 'progress_idx': ctx.get('progress_idx'), }, info_dict) def _prepare_external_frag_download(self, ctx): @@ -347,7 +359,44 @@ class FragmentFD(FileDownloader): return decrypt_fragment - def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None): + def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_func=None): + ''' + @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ... + all args must be either tuple or list + ''' + max_progress = len(args) + if max_progress == 1: + return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func) + max_workers = self.params.get('concurrent_fragment_downloads', max_progress) + self._prepare_multiline_status(max_progress) + + def thread_func(idx, ctx, fragments, info_dict, tpe): + ctx['max_progress'] = max_progress + ctx['progress_idx'] = idx + return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe) + + class FTPE(concurrent.futures.ThreadPoolExecutor): + # has to stop this or it's going to wait on the worker thread itself + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + spins = [] + for idx, (ctx, fragments, info_dict) in enumerate(args): + tpe = FTPE(ceil(max_workers / max_progress)) + job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe) + spins.append((tpe, job)) + + result = True + for tpe, job in spins: + try: + result = result and job.result() + finally: + tpe.shutdown(wait=True) + + self._finish_multiline_status() + return True + + def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None): fragment_retries = self.params.get('fragment_retries', 0) is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True) if not pack_func: @@ -416,7 +465,7 @@ class FragmentFD(FileDownloader): return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized') self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome') - with concurrent.futures.ThreadPoolExecutor(max_workers) as pool: + with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool: for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments): ctx['fragment_filename_sanitized'] = frag_filename ctx['fragment_index'] = frag_index |