diff options
Diffstat (limited to 'yt_dlp/downloader/hls.py')
-rw-r--r-- | yt_dlp/downloader/hls.py | 188 |
1 files changed, 127 insertions, 61 deletions
diff --git a/yt_dlp/downloader/hls.py b/yt_dlp/downloader/hls.py index 77606b0ed..8b7d51de3 100644 --- a/yt_dlp/downloader/hls.py +++ b/yt_dlp/downloader/hls.py @@ -7,6 +7,11 @@ try: can_decrypt_frag = True except ImportError: can_decrypt_frag = False +try: + import concurrent.futures + can_threaded_download = True +except ImportError: + can_threaded_download = False from ..downloader import _get_real_downloader from .fragment import FragmentFD @@ -19,6 +24,7 @@ from ..compat import ( ) from ..utils import ( parse_m3u8_attributes, + sanitize_open, update_url_query, ) @@ -151,7 +157,6 @@ class HlsFD(FragmentFD): ad_frag_next = False for line in s.splitlines(): line = line.strip() - download_frag = False if line: if not line.startswith('#'): if format_index and discontinuity_count != format_index: @@ -168,13 +173,13 @@ class HlsFD(FragmentFD): if extra_query: frag_url = update_url_query(frag_url, extra_query) - if real_downloader: - fragments.append({ - 'url': frag_url, - 'decrypt_info': decrypt_info, - }) - continue - download_frag = True + fragments.append({ + 'frag_index': frag_index, + 'url': frag_url, + 'decrypt_info': decrypt_info, + 'byte_range': byte_range, + 'media_sequence': media_sequence, + }) elif line.startswith('#EXT-X-MAP'): if format_index and discontinuity_count != format_index: @@ -191,12 +196,14 @@ class HlsFD(FragmentFD): else compat_urlparse.urljoin(man_url, map_info.get('URI'))) if extra_query: frag_url = update_url_query(frag_url, extra_query) - if real_downloader: - fragments.append({ - 'url': frag_url, - 'decrypt_info': decrypt_info, - }) - continue + + fragments.append({ + 'frag_index': frag_index, + 'url': frag_url, + 'decrypt_info': decrypt_info, + 'byte_range': byte_range, + 'media_sequence': media_sequence + }) if map_info.get('BYTERANGE'): splitted_byte_range = map_info.get('BYTERANGE').split('@') @@ -205,7 +212,6 @@ class HlsFD(FragmentFD): 'start': sub_range_start, 'end': sub_range_start + int(splitted_byte_range[0]), } - download_frag = True elif line.startswith('#EXT-X-KEY'): decrypt_url = decrypt_info.get('URI') @@ -236,53 +242,12 @@ class HlsFD(FragmentFD): ad_frag_next = False elif line.startswith('#EXT-X-DISCONTINUITY'): discontinuity_count += 1 + i += 1 + media_sequence += 1 - if download_frag: - count = 0 - headers = info_dict.get('http_headers', {}) - if byte_range: - headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1) - while count <= fragment_retries: - try: - success, frag_content = self._download_fragment( - ctx, frag_url, info_dict, headers) - if not success: - return False - break - except compat_urllib_error.HTTPError as err: - # Unavailable (possibly temporary) fragments may be served. - # First we try to retry then either skip or abort. - # See https://github.com/ytdl-org/youtube-dl/issues/10165, - # https://github.com/ytdl-org/youtube-dl/issues/10448). - count += 1 - if count <= fragment_retries: - self.report_retry_fragment(err, frag_index, count, fragment_retries) - if count > fragment_retries: - if skip_unavailable_fragments: - i += 1 - media_sequence += 1 - self.report_skip_fragment(frag_index) - continue - self.report_error( - 'giving up after %s fragment retries' % fragment_retries) - return False - - if decrypt_info['METHOD'] == 'AES-128': - iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', media_sequence) - decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen( - self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read() - # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block - # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded, - # not what it decrypts to. - if not test: - frag_content = AES.new( - decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content) - self._append_fragment(ctx, frag_content) - # We only download the first fragment during the test - if test: - break - i += 1 - media_sequence += 1 + # We only download the first fragment during the test + if test: + fragments = [fragments[0] if fragments else None] if real_downloader: info_copy = info_dict.copy() @@ -295,5 +260,106 @@ class HlsFD(FragmentFD): if not success: return False else: + def download_fragment(fragment): + frag_index = fragment['frag_index'] + frag_url = fragment['url'] + decrypt_info = fragment['decrypt_info'] + byte_range = fragment['byte_range'] + media_sequence = fragment['media_sequence'] + + ctx['fragment_index'] = frag_index + + count = 0 + headers = info_dict.get('http_headers', {}) + if byte_range: + headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1) + while count <= fragment_retries: + try: + success, frag_content = self._download_fragment( + ctx, frag_url, info_dict, headers) + if not success: + return False, frag_index + break + except compat_urllib_error.HTTPError as err: + # Unavailable (possibly temporary) fragments may be served. + # First we try to retry then either skip or abort. + # See https://github.com/ytdl-org/youtube-dl/issues/10165, + # https://github.com/ytdl-org/youtube-dl/issues/10448). + count += 1 + if count <= fragment_retries: + self.report_retry_fragment(err, frag_index, count, fragment_retries) + if count > fragment_retries: + return False, frag_index + + if decrypt_info['METHOD'] == 'AES-128': + iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', media_sequence) + decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen( + self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read() + # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block + # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded, + # not what it decrypts to. + if not test: + frag_content = AES.new( + decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content) + + return frag_content, frag_index + + def append_fragment(frag_content, frag_index): + if frag_content: + fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index) + try: + file, frag_sanitized = sanitize_open(fragment_filename, 'rb') + ctx['fragment_filename_sanitized'] = frag_sanitized + file.close() + self._append_fragment(ctx, frag_content) + return True + except FileNotFoundError: + if skip_unavailable_fragments: + self.report_skip_fragment(frag_index) + return True + else: + self.report_error( + 'fragment %s not found, unable to continue' % frag_index) + return False + else: + if skip_unavailable_fragments: + self.report_skip_fragment(frag_index) + return True + else: + self.report_error( + 'fragment %s not found, unable to continue' % frag_index) + return False + + max_workers = self.params.get('concurrent_fragment_downloads', 1) + if can_threaded_download and max_workers > 1: + self.report_warning('The download speed shown is only of one thread. This is a known issue') + with concurrent.futures.ThreadPoolExecutor(max_workers) as pool: + futures = [pool.submit(download_fragment, fragment) for fragment in fragments] + # timeout must be 0 to return instantly + done, not_done = concurrent.futures.wait(futures, timeout=0) + try: + while not_done: + # Check every 1 second for KeyboardInterrupt + freshly_done, not_done = concurrent.futures.wait(not_done, timeout=1) + done |= freshly_done + except KeyboardInterrupt: + for future in not_done: + future.cancel() + # timeout must be none to cancel + concurrent.futures.wait(not_done, timeout=None) + raise KeyboardInterrupt + results = [future.result() for future in futures] + + for frag_content, frag_index in results: + result = append_fragment(frag_content, frag_index) + if not result: + return False + else: + for fragment in fragments: + frag_content, frag_index = download_fragment(fragment) + result = append_fragment(frag_content, frag_index) + if not result: + return False + self._finish_frag_download(ctx) return True |