aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorJesus <heckyel@riseup.net>2023-09-04 01:59:36 +0800
committerJesus <heckyel@riseup.net>2023-09-04 01:59:36 +0800
commitb3013540b41d1eb77c4803c5fca46f8d75b40fc1 (patch)
tree97735cb0c49f3a2b0f276e1cd90817833d590d69 /test
parenteaeeef9c1d1bedb76fea953c332ef84d53bffe2c (diff)
downloadhypervideo-b3013540b41d1eb77c4803c5fca46f8d75b40fc1.tar.lz
hypervideo-b3013540b41d1eb77c4803c5fca46f8d75b40fc1.tar.xz
hypervideo-b3013540b41d1eb77c4803c5fca46f8d75b40fc1.zip
update from upstream
Diffstat (limited to 'test')
-rw-r--r--test/conftest.py21
-rw-r--r--test/helper.py4
-rw-r--r--test/test_InfoExtractor.py128
-rw-r--r--test/test_YoutubeDL.py168
-rw-r--r--test/test_YoutubeDLCookieJar.py24
-rw-r--r--test/test_aes.py6
-rw-r--r--test/test_age_restriction.py19
-rw-r--r--test/test_compat.py9
-rw-r--r--test/test_config.py227
-rw-r--r--test/test_cookies.py18
-rwxr-xr-xtest/test_download.py9
-rw-r--r--test/test_downloader_external.py139
-rw-r--r--test/test_downloader_http.py12
-rw-r--r--test/test_networking.py1439
-rw-r--r--test/test_networking_utils.py282
-rw-r--r--test/test_plugins.py73
-rw-r--r--test/test_socks.py521
-rw-r--r--test/test_utils.py363
-rw-r--r--test/testdata/yt_dlp_plugins/extractor/_ignore.py5
-rw-r--r--test/testdata/yt_dlp_plugins/extractor/ignore.py12
-rw-r--r--test/testdata/yt_dlp_plugins/extractor/normal.py9
-rw-r--r--test/testdata/yt_dlp_plugins/postprocessor/normal.py5
-rw-r--r--test/testdata/zipped_plugins/yt_dlp_plugins/extractor/zipped.py5
-rw-r--r--test/testdata/zipped_plugins/yt_dlp_plugins/postprocessor/zipped.py5
24 files changed, 3223 insertions, 280 deletions
diff --git a/test/conftest.py b/test/conftest.py
new file mode 100644
index 0000000..48d9288
--- /dev/null
+++ b/test/conftest.py
@@ -0,0 +1,21 @@
+import functools
+import inspect
+
+import pytest
+
+from hypervideo_dl.networking import RequestHandler
+from hypervideo_dl.networking.common import _REQUEST_HANDLERS
+from hypervideo_dl.utils._utils import _YDLLogger as FakeLogger
+
+
+@pytest.fixture
+def handler(request):
+ RH_KEY = request.param
+ if inspect.isclass(RH_KEY) and issubclass(RH_KEY, RequestHandler):
+ handler = RH_KEY
+ elif RH_KEY in _REQUEST_HANDLERS:
+ handler = _REQUEST_HANDLERS[RH_KEY]
+ else:
+ pytest.skip(f'{RH_KEY} request handler is not available')
+
+ return functools.partial(handler, logger=FakeLogger)
diff --git a/test/helper.py b/test/helper.py
index 1dae86f..62f78b4 100644
--- a/test/helper.py
+++ b/test/helper.py
@@ -194,8 +194,8 @@ def sanitize_got_info_dict(got_dict):
'formats', 'thumbnails', 'subtitles', 'automatic_captions', 'comments', 'entries',
# Auto-generated
- 'autonumber', 'playlist', 'format_index', 'video_ext', 'audio_ext', 'duration_string', 'epoch',
- 'fulltitle', 'extractor', 'extractor_key', 'filepath', 'infojson_filename', 'original_url', 'n_entries',
+ 'autonumber', 'playlist', 'format_index', 'video_ext', 'audio_ext', 'duration_string', 'epoch', 'n_entries',
+ 'fulltitle', 'extractor', 'extractor_key', 'filename', 'filepath', 'infojson_filename', 'original_url',
# Only live_status needs to be checked
'is_live', 'was_live',
diff --git a/test/test_InfoExtractor.py b/test/test_InfoExtractor.py
index 529da52..4712c91 100644
--- a/test/test_InfoExtractor.py
+++ b/test/test_InfoExtractor.py
@@ -69,6 +69,7 @@ class TestInfoExtractor(unittest.TestCase):
<meta name="og:test1" content='foo > < bar'/>
<meta name="og:test2" content="foo >//< bar"/>
<meta property=og-test3 content='Ill-formatted opengraph'/>
+ <meta property=og:test4 content=unquoted-value/>
'''
self.assertEqual(ie._og_search_title(html), 'Foo')
self.assertEqual(ie._og_search_description(html), 'Some video\'s description ')
@@ -81,6 +82,7 @@ class TestInfoExtractor(unittest.TestCase):
self.assertEqual(ie._og_search_property(('test0', 'test1'), html), 'foo > < bar')
self.assertRaises(RegexNotFoundError, ie._og_search_property, 'test0', html, None, fatal=True)
self.assertRaises(RegexNotFoundError, ie._og_search_property, ('test0', 'test00'), html, None, fatal=True)
+ self.assertEqual(ie._og_search_property('test4', html), 'unquoted-value')
def test_html_search_meta(self):
ie = self.ie
@@ -915,8 +917,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'acodec': 'mp4a.40.2',
'video_ext': 'mp4',
'audio_ext': 'none',
- 'vbr': 263.851,
- 'abr': 0,
}, {
'format_id': '577',
'format_index': None,
@@ -934,8 +934,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'acodec': 'mp4a.40.2',
'video_ext': 'mp4',
'audio_ext': 'none',
- 'vbr': 577.61,
- 'abr': 0,
}, {
'format_id': '915',
'format_index': None,
@@ -953,8 +951,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'acodec': 'mp4a.40.2',
'video_ext': 'mp4',
'audio_ext': 'none',
- 'vbr': 915.905,
- 'abr': 0,
}, {
'format_id': '1030',
'format_index': None,
@@ -972,8 +968,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'acodec': 'mp4a.40.2',
'video_ext': 'mp4',
'audio_ext': 'none',
- 'vbr': 1030.138,
- 'abr': 0,
}, {
'format_id': '1924',
'format_index': None,
@@ -991,8 +985,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'acodec': 'mp4a.40.2',
'video_ext': 'mp4',
'audio_ext': 'none',
- 'vbr': 1924.009,
- 'abr': 0,
}],
{
'en': [{
@@ -1404,6 +1396,7 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'none',
'acodec': 'AACL',
'protocol': 'ism',
+ 'audio_channels': 2,
'_download_params': {
'stream_type': 'audio',
'duration': 8880746666,
@@ -1417,9 +1410,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'audio_ext': 'isma',
- 'video_ext': 'none',
- 'abr': 128,
}, {
'format_id': 'video-100',
'url': 'https://sdn-global-streaming-cache-3qsdn.akamaized.net/stream/3144/files/17/07/672975/3144-kZT4LWMQw6Rh7Kpd.ism/Manifest',
@@ -1443,9 +1433,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 100,
}, {
'format_id': 'video-326',
'url': 'https://sdn-global-streaming-cache-3qsdn.akamaized.net/stream/3144/files/17/07/672975/3144-kZT4LWMQw6Rh7Kpd.ism/Manifest',
@@ -1469,9 +1456,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 326,
}, {
'format_id': 'video-698',
'url': 'https://sdn-global-streaming-cache-3qsdn.akamaized.net/stream/3144/files/17/07/672975/3144-kZT4LWMQw6Rh7Kpd.ism/Manifest',
@@ -1495,9 +1479,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 698,
}, {
'format_id': 'video-1493',
'url': 'https://sdn-global-streaming-cache-3qsdn.akamaized.net/stream/3144/files/17/07/672975/3144-kZT4LWMQw6Rh7Kpd.ism/Manifest',
@@ -1521,9 +1502,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 1493,
}, {
'format_id': 'video-4482',
'url': 'https://sdn-global-streaming-cache-3qsdn.akamaized.net/stream/3144/files/17/07/672975/3144-kZT4LWMQw6Rh7Kpd.ism/Manifest',
@@ -1547,9 +1525,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 4482,
}],
{
'eng': [
@@ -1573,61 +1548,57 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'ec-3_test',
'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
[{
- 'format_id': 'audio_deu_1-224',
+ 'format_id': 'audio_deu-127',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
'manifest_url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
'ext': 'isma',
- 'tbr': 224,
+ 'tbr': 127,
'asr': 48000,
'vcodec': 'none',
- 'acodec': 'EC-3',
+ 'acodec': 'AACL',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ 'audio_channels': 2,
+ '_download_params': {
'stream_type': 'audio',
'duration': 370000000,
'timescale': 10000000,
'width': 0,
'height': 0,
- 'fourcc': 'EC-3',
+ 'fourcc': 'AACL',
'language': 'deu',
- 'codec_private_data': '00063F000000AF87FBA7022DFB42A4D405CD93843BDD0700200F00',
+ 'codec_private_data': '1190',
'sampling_rate': 48000,
- 'channels': 6,
+ 'channels': 2,
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'audio_ext': 'isma',
- 'video_ext': 'none',
- 'abr': 224,
}, {
- 'format_id': 'audio_deu-127',
+ 'format_id': 'audio_deu_1-224',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
'manifest_url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
'ext': 'isma',
- 'tbr': 127,
+ 'tbr': 224,
'asr': 48000,
'vcodec': 'none',
- 'acodec': 'AACL',
+ 'acodec': 'EC-3',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ 'audio_channels': 6,
+ '_download_params': {
'stream_type': 'audio',
'duration': 370000000,
'timescale': 10000000,
'width': 0,
'height': 0,
- 'fourcc': 'AACL',
+ 'fourcc': 'EC-3',
'language': 'deu',
- 'codec_private_data': '1190',
+ 'codec_private_data': '00063F000000AF87FBA7022DFB42A4D405CD93843BDD0700200F00',
'sampling_rate': 48000,
- 'channels': 2,
+ 'channels': 6,
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'audio_ext': 'isma',
- 'video_ext': 'none',
- 'abr': 127,
}, {
'format_id': 'video_deu-23',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1639,8 +1610,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1653,9 +1624,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 23,
}, {
'format_id': 'video_deu-403',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1667,8 +1635,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1681,9 +1649,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 403,
}, {
'format_id': 'video_deu-680',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1695,8 +1660,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1709,9 +1674,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 680,
}, {
'format_id': 'video_deu-1253',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1723,8 +1685,9 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'vbr': 1253,
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1737,9 +1700,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 1253,
}, {
'format_id': 'video_deu-2121',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1751,8 +1711,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1765,9 +1725,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 2121,
}, {
'format_id': 'video_deu-3275',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1779,8 +1736,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1793,9 +1750,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 3275,
}, {
'format_id': 'video_deu-5300',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1807,8 +1761,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1821,9 +1775,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 5300,
}, {
'format_id': 'video_deu-8079',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1835,8 +1786,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1849,9 +1800,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 8079,
}],
{},
),
diff --git a/test/test_YoutubeDL.py b/test/test_YoutubeDL.py
index 2d4e827..2810080 100644
--- a/test/test_YoutubeDL.py
+++ b/test/test_YoutubeDL.py
@@ -10,9 +10,8 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import copy
import json
-import urllib.error
-from test.helper import FakeYDL, assertRegexpMatches
+from test.helper import FakeYDL, assertRegexpMatches, try_rm
from hypervideo_dl import YoutubeDL
from hypervideo_dl.compat import compat_os_name
from hypervideo_dl.extractor import YoutubeIE
@@ -25,6 +24,7 @@ from hypervideo_dl.utils import (
int_or_none,
match_filter_func,
)
+from hypervideo_dl.utils.traversal import traverse_obj
TEST_URL = 'http://localhost/sample.mp4'
@@ -632,6 +632,7 @@ class TestYoutubeDL(unittest.TestCase):
outtmpl_info = {
'id': '1234',
+ 'id': '1234',
'ext': 'mp4',
'width': None,
'height': 1080,
@@ -669,7 +670,7 @@ class TestYoutubeDL(unittest.TestCase):
for (name, got), expect in zip((('outtmpl', out), ('filename', fname)), expected):
if callable(expect):
self.assertTrue(expect(got), f'Wrong {name} from {tmpl}')
- else:
+ elif expect is not None:
self.assertEqual(got, expect, f'Wrong {name} from {tmpl}')
# Side-effects
@@ -684,7 +685,8 @@ class TestYoutubeDL(unittest.TestCase):
test('%(id)s.%(ext)s', '1234.mp4')
test('%(duration_string)s', ('27:46:40', '27-46-40'))
test('%(resolution)s', '1080p')
- test('%(playlist_index)s', '001')
+ test('%(playlist_index|)s', '001')
+ test('%(playlist_index&{}!)s', '1!')
test('%(playlist_autonumber)s', '02')
test('%(autonumber)s', '00001')
test('%(autonumber+2)03d', '005', autonumber_start=3)
@@ -755,20 +757,23 @@ class TestYoutubeDL(unittest.TestCase):
test('%(ext)c', 'm')
test('%(id)d %(id)r', "1234 '1234'")
test('%(id)r %(height)r', "'1234' 1080")
+ test('%(title5)a %(height)a', (R"'\xe1\xe9\xed \U0001d400' 1080", None))
test('%(ext)s-%(ext|def)d', 'mp4-def')
- test('%(width|0)04d', '0000')
- test('a%(width|)d', 'a', outtmpl_na_placeholder='none')
+ test('%(width|0)04d', '0')
+ test('a%(width|b)d', 'ab', outtmpl_na_placeholder='none')
FORMATS = self.outtmpl_info['formats']
- sanitize = lambda x: x.replace(':', ':').replace('"', """).replace('\n', ' ')
# Custom type casting
test('%(formats.:.id)l', 'id 1, id 2, id 3')
test('%(formats.:.id)#l', ('id 1\nid 2\nid 3', 'id 1 id 2 id 3'))
test('%(ext)l', 'mp4')
test('%(formats.:.id) 18l', ' id 1, id 2, id 3')
- test('%(formats)j', (json.dumps(FORMATS), sanitize(json.dumps(FORMATS))))
- test('%(formats)#j', (json.dumps(FORMATS, indent=4), sanitize(json.dumps(FORMATS, indent=4))))
+ test('%(formats)j', (json.dumps(FORMATS), None))
+ test('%(formats)#j', (
+ json.dumps(FORMATS, indent=4),
+ json.dumps(FORMATS, indent=4).replace(':', ':').replace('"', """).replace('\n', ' ')
+ ))
test('%(title5).3B', 'á')
test('%(title5)U', 'áéí 𝐀')
test('%(title5)#U', 'a\u0301e\u0301i\u0301 𝐀')
@@ -793,8 +798,8 @@ class TestYoutubeDL(unittest.TestCase):
test('%(title|%)s %(title|%%)s', '% %%')
test('%(id+1-height+3)05d', '00158')
test('%(width+100)05d', 'NA')
- test('%(formats.0) 15s', ('% 15s' % FORMATS[0], '% 15s' % sanitize(str(FORMATS[0]))))
- test('%(formats.0)r', (repr(FORMATS[0]), sanitize(repr(FORMATS[0]))))
+ test('%(formats.0) 15s', ('% 15s' % FORMATS[0], None))
+ test('%(formats.0)r', (repr(FORMATS[0]), None))
test('%(height.0)03d', '001')
test('%(-height.0)04d', '-001')
test('%(formats.-1.id)s', FORMATS[-1]['id'])
@@ -806,7 +811,7 @@ class TestYoutubeDL(unittest.TestCase):
out = json.dumps([{'id': f['id'], 'height.:2': str(f['height'])[:2]}
if 'height' in f else {'id': f['id']}
for f in FORMATS])
- test('%(formats.:.{id,height.:2})j', (out, sanitize(out)))
+ test('%(formats.:.{id,height.:2})j', (out, None))
test('%(formats.:.{id,height}.id)l', ', '.join(f['id'] for f in FORMATS))
test('%(.{id,title})j', ('{"id": "1234"}', '{"id": "1234"}'))
@@ -822,6 +827,11 @@ class TestYoutubeDL(unittest.TestCase):
test('%(title&foo|baz)s.bar', 'baz.bar')
test('%(x,id&foo|baz)s.bar', 'foo.bar')
test('%(x,title&foo|baz)s.bar', 'baz.bar')
+ test('%(id&a\nb|)s', ('a\nb', 'a b'))
+ test('%(id&hi {:>10} {}|)s', 'hi 1234 1234')
+ test(R'%(id&{0} {}|)s', 'NA')
+ test(R'%(id&{0.1}|)s', 'NA')
+ test('%(height&{:,d})S', '1,080')
# Laziness
def gen():
@@ -867,12 +877,12 @@ class TestYoutubeDL(unittest.TestCase):
class SimplePP(PostProcessor):
def run(self, info):
- with open(audiofile, 'wt') as f:
+ with open(audiofile, 'w') as f:
f.write('EXAMPLE')
return [info['filepath']], info
def run_pp(params, PP):
- with open(filename, 'wt') as f:
+ with open(filename, 'w') as f:
f.write('EXAMPLE')
ydl = YoutubeDL(params)
ydl.add_post_processor(PP())
@@ -891,7 +901,7 @@ class TestYoutubeDL(unittest.TestCase):
class ModifierPP(PostProcessor):
def run(self, info):
- with open(info['filepath'], 'wt') as f:
+ with open(info['filepath'], 'w') as f:
f.write('MODIFIED')
return [], info
@@ -1093,11 +1103,6 @@ class TestYoutubeDL(unittest.TestCase):
test_selection({'playlist_items': '-15::2'}, INDICES[1::2], True)
test_selection({'playlist_items': '-15::15'}, [], True)
- def test_urlopen_no_file_protocol(self):
- # see https://github.com/ytdl-org/youtube-dl/issues/8227
- ydl = YDL()
- self.assertRaises(urllib.error.URLError, ydl.urlopen, 'file:///etc/passwd')
-
def test_do_not_override_ie_key_in_url_transparent(self):
ydl = YDL()
@@ -1211,6 +1216,129 @@ class TestYoutubeDL(unittest.TestCase):
self.assertEqual(downloaded['extractor'], 'Video')
self.assertEqual(downloaded['extractor_key'], 'Video')
+ def test_header_cookies(self):
+ from http.cookiejar import Cookie
+
+ ydl = FakeYDL()
+ ydl.report_warning = lambda *_, **__: None
+
+ def cookie(name, value, version=None, domain='', path='', secure=False, expires=None):
+ return Cookie(
+ version or 0, name, value, None, False,
+ domain, bool(domain), bool(domain), path, bool(path),
+ secure, expires, False, None, None, rest={})
+
+ _test_url = 'https://yt.dlp/test'
+
+ def test(encoded_cookies, cookies, *, headers=False, round_trip=None, error_re=None):
+ def _test():
+ ydl.cookiejar.clear()
+ ydl._load_cookies(encoded_cookies, autoscope=headers)
+ if headers:
+ ydl._apply_header_cookies(_test_url)
+ data = {'url': _test_url}
+ ydl._calc_headers(data)
+ self.assertCountEqual(
+ map(vars, ydl.cookiejar), map(vars, cookies),
+ 'Extracted cookiejar.Cookie is not the same')
+ if not headers:
+ self.assertEqual(
+ data.get('cookies'), round_trip or encoded_cookies,
+ 'Cookie is not the same as round trip')
+ ydl.__dict__['_YoutubeDL__header_cookies'] = []
+
+ with self.subTest(msg=encoded_cookies):
+ if not error_re:
+ _test()
+ return
+ with self.assertRaisesRegex(Exception, error_re):
+ _test()
+
+ test('test=value; Domain=.yt.dlp', [cookie('test', 'value', domain='.yt.dlp')])
+ test('test=value', [cookie('test', 'value')], error_re=r'Unscoped cookies are not allowed')
+ test('cookie1=value1; Domain=.yt.dlp; Path=/test; cookie2=value2; Domain=.yt.dlp; Path=/', [
+ cookie('cookie1', 'value1', domain='.yt.dlp', path='/test'),
+ cookie('cookie2', 'value2', domain='.yt.dlp', path='/')])
+ test('test=value; Domain=.yt.dlp; Path=/test; Secure; Expires=9999999999', [
+ cookie('test', 'value', domain='.yt.dlp', path='/test', secure=True, expires=9999999999)])
+ test('test="value; "; path=/test; domain=.yt.dlp', [
+ cookie('test', 'value; ', domain='.yt.dlp', path='/test')],
+ round_trip='test="value\\073 "; Domain=.yt.dlp; Path=/test')
+ test('name=; Domain=.yt.dlp', [cookie('name', '', domain='.yt.dlp')],
+ round_trip='name=""; Domain=.yt.dlp')
+
+ test('test=value', [cookie('test', 'value', domain='.yt.dlp')], headers=True)
+ test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error_re=r'Invalid syntax')
+ ydl.deprecated_feature = ydl.report_error
+ test('test=value', [], headers=True, error_re=r'Passing cookies as a header is a potential security risk')
+
+ def test_infojson_cookies(self):
+ TEST_FILE = 'test_infojson_cookies.info.json'
+ TEST_URL = 'https://example.com/example.mp4'
+ COOKIES = 'a=b; Domain=.example.com; c=d; Domain=.example.com'
+ COOKIE_HEADER = {'Cookie': 'a=b; c=d'}
+
+ ydl = FakeYDL()
+ ydl.process_info = lambda x: ydl._write_info_json('test', x, TEST_FILE)
+
+ def make_info(info_header_cookies=False, fmts_header_cookies=False, cookies_field=False):
+ fmt = {'url': TEST_URL}
+ if fmts_header_cookies:
+ fmt['http_headers'] = COOKIE_HEADER
+ if cookies_field:
+ fmt['cookies'] = COOKIES
+ return _make_result([fmt], http_headers=COOKIE_HEADER if info_header_cookies else None)
+
+ def test(initial_info, note):
+ result = {}
+ result['processed'] = ydl.process_ie_result(initial_info)
+ self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
+ msg=f'No cookies set in cookiejar after initial process when {note}')
+ ydl.cookiejar.clear()
+ with open(TEST_FILE) as infojson:
+ result['loaded'] = ydl.sanitize_info(json.load(infojson), True)
+ result['final'] = ydl.process_ie_result(result['loaded'].copy(), download=False)
+ self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
+ msg=f'No cookies set in cookiejar after final process when {note}')
+ ydl.cookiejar.clear()
+ for key in ('processed', 'loaded', 'final'):
+ info = result[key]
+ self.assertIsNone(
+ traverse_obj(info, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False),
+ msg=f'Cookie header not removed in {key} result when {note}')
+ self.assertEqual(
+ traverse_obj(info, ((None, ('formats', 0)), 'cookies'), get_all=False), COOKIES,
+ msg=f'No cookies field found in {key} result when {note}')
+
+ test({'url': TEST_URL, 'http_headers': COOKIE_HEADER, 'id': '1', 'title': 'x'}, 'no formats field')
+ test(make_info(info_header_cookies=True), 'info_dict header cokies')
+ test(make_info(fmts_header_cookies=True), 'format header cookies')
+ test(make_info(info_header_cookies=True, fmts_header_cookies=True), 'info_dict and format header cookies')
+ test(make_info(info_header_cookies=True, fmts_header_cookies=True, cookies_field=True), 'all cookies fields')
+ test(make_info(cookies_field=True), 'cookies format field')
+ test({'url': TEST_URL, 'cookies': COOKIES, 'id': '1', 'title': 'x'}, 'info_dict cookies field only')
+
+ try_rm(TEST_FILE)
+
+ def test_add_headers_cookie(self):
+ def check_for_cookie_header(result):
+ return traverse_obj(result, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False)
+
+ ydl = FakeYDL({'http_headers': {'Cookie': 'a=b'}})
+ ydl._apply_header_cookies(_make_result([])['webpage_url']) # Scope to input webpage URL: .example.com
+
+ fmt = {'url': 'https://example.com/video.mp4'}
+ result = ydl.process_ie_result(_make_result([fmt]), download=False)
+ self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies in result info_dict')
+ self.assertEqual(result.get('cookies'), 'a=b; Domain=.example.com', msg='No cookies were set in cookies field')
+ self.assertIn('a=b', ydl.cookiejar.get_cookie_header(fmt['url']), msg='No cookies were set in cookiejar')
+
+ fmt = {'url': 'https://wrong.com/video.mp4'}
+ result = ydl.process_ie_result(_make_result([fmt]), download=False)
+ self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies for wrong domain')
+ self.assertFalse(result.get('cookies'), msg='Cookies set in cookies field for wrong domain')
+ self.assertFalse(ydl.cookiejar.get_cookie_header(fmt['url']), msg='Cookies set in cookiejar for wrong domain')
+
if __name__ == '__main__':
unittest.main()
diff --git a/test/test_YoutubeDLCookieJar.py b/test/test_YoutubeDLCookieJar.py
index 26922d6..ffeb6f4 100644
--- a/test/test_YoutubeDLCookieJar.py
+++ b/test/test_YoutubeDLCookieJar.py
@@ -11,16 +11,16 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import re
import tempfile
-from hypervideo_dl.utils import YoutubeDLCookieJar
+from hypervideo_dl.cookies import YoutubeDLCookieJar
class TestYoutubeDLCookieJar(unittest.TestCase):
def test_keep_session_cookies(self):
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/session_cookies.txt')
- cookiejar.load(ignore_discard=True, ignore_expires=True)
+ cookiejar.load()
tf = tempfile.NamedTemporaryFile(delete=False)
try:
- cookiejar.save(filename=tf.name, ignore_discard=True, ignore_expires=True)
+ cookiejar.save(filename=tf.name)
temp = tf.read().decode()
self.assertTrue(re.search(
r'www\.foobar\.foobar\s+FALSE\s+/\s+TRUE\s+0\s+YoutubeDLExpiresEmpty\s+YoutubeDLExpiresEmptyValue', temp))
@@ -32,7 +32,7 @@ class TestYoutubeDLCookieJar(unittest.TestCase):
def test_strip_httponly_prefix(self):
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/httponly_cookies.txt')
- cookiejar.load(ignore_discard=True, ignore_expires=True)
+ cookiejar.load()
def assert_cookie_has_value(key):
self.assertEqual(cookiejar._cookies['www.foobar.foobar']['/'][key].value, key + '_VALUE')
@@ -42,11 +42,25 @@ class TestYoutubeDLCookieJar(unittest.TestCase):
def test_malformed_cookies(self):
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/malformed_cookies.txt')
- cookiejar.load(ignore_discard=True, ignore_expires=True)
+ cookiejar.load()
# Cookies should be empty since all malformed cookie file entries
# will be ignored
self.assertFalse(cookiejar._cookies)
+ def test_get_cookie_header(self):
+ cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/httponly_cookies.txt')
+ cookiejar.load()
+ header = cookiejar.get_cookie_header('https://www.foobar.foobar')
+ self.assertIn('HTTPONLY_COOKIE', header)
+
+ def test_get_cookies_for_url(self):
+ cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/session_cookies.txt')
+ cookiejar.load()
+ cookies = cookiejar.get_cookies_for_url('https://www.foobar.foobar/')
+ self.assertEqual(len(cookies), 2)
+ cookies = cookiejar.get_cookies_for_url('https://foobar.foobar/')
+ self.assertFalse(cookies)
+
if __name__ == '__main__':
unittest.main()
diff --git a/test/test_aes.py b/test/test_aes.py
index 0f35bc2..cace61c 100644
--- a/test/test_aes.py
+++ b/test/test_aes.py
@@ -26,7 +26,7 @@ from hypervideo_dl.aes import (
key_expansion,
pad_block,
)
-from hypervideo_dl.dependencies import Cryptodome_AES
+from hypervideo_dl.dependencies import Cryptodome
from hypervideo_dl.utils import bytes_to_intlist, intlist_to_bytes
# the encrypted data can be generate with 'devscripts/generate_aes_testdata.py'
@@ -48,7 +48,7 @@ class TestAES(unittest.TestCase):
data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd'
decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
- if Cryptodome_AES:
+ if Cryptodome.AES:
decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
@@ -78,7 +78,7 @@ class TestAES(unittest.TestCase):
decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
- if Cryptodome_AES:
+ if Cryptodome.AES:
decrypted = aes_gcm_decrypt_and_verify_bytes(
data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
diff --git a/test/test_age_restriction.py b/test/test_age_restriction.py
index 034359b..46b0996 100644
--- a/test/test_age_restriction.py
+++ b/test/test_age_restriction.py
@@ -10,6 +10,7 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from test.helper import is_download_test, try_rm
from hypervideo_dl import YoutubeDL
+from hypervideo_dl.utils import DownloadError
def _download_restricted(url, filename, age):
@@ -25,10 +26,14 @@ def _download_restricted(url, filename, age):
ydl.add_default_info_extractors()
json_filename = os.path.splitext(filename)[0] + '.info.json'
try_rm(json_filename)
- ydl.download([url])
- res = os.path.exists(json_filename)
- try_rm(json_filename)
- return res
+ try:
+ ydl.download([url])
+ except DownloadError:
+ pass
+ else:
+ return os.path.exists(json_filename)
+ finally:
+ try_rm(json_filename)
@is_download_test
@@ -38,12 +43,12 @@ class TestAgeRestriction(unittest.TestCase):
self.assertFalse(_download_restricted(url, filename, age))
def test_youtube(self):
- self._assert_restricted('07FYdnEawAQ', '07FYdnEawAQ.mp4', 10)
+ self._assert_restricted('HtVdAasjOgU', 'HtVdAasjOgU.mp4', 10)
def test_youporn(self):
self._assert_restricted(
- 'http://www.youporn.com/watch/505835/sex-ed-is-it-safe-to-masturbate-daily/',
- '505835.mp4', 2, old_age=25)
+ 'https://www.youporn.com/watch/16715086/sex-ed-in-detention-18-asmr/',
+ '16715086.mp4', 2, old_age=25)
if __name__ == '__main__':
diff --git a/test/test_compat.py b/test/test_compat.py
index 7a191c0..e1ae193 100644
--- a/test/test_compat.py
+++ b/test/test_compat.py
@@ -9,15 +9,16 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import struct
-import urllib.parse
from hypervideo_dl import compat
+from hypervideo_dl.compat import urllib # isort: split
from hypervideo_dl.compat import (
compat_etree_fromstring,
compat_expanduser,
compat_urllib_parse_unquote,
compat_urllib_parse_urlencode,
)
+from hypervideo_dl.compat.urllib.request import getproxies
class TestCompat(unittest.TestCase):
@@ -28,8 +29,10 @@ class TestCompat(unittest.TestCase):
with self.assertWarns(DeprecationWarning):
compat.WINDOWS_VT_MODE
- # TODO: Test submodule
- # compat.asyncio.events # Must not raise error
+ self.assertEqual(urllib.request.getproxies, getproxies)
+
+ with self.assertWarns(DeprecationWarning):
+ compat.compat_pycrypto_AES # Must not raise error
def test_compat_expanduser(self):
old_home = os.environ.get('HOME')
diff --git a/test/test_config.py b/test/test_config.py
new file mode 100644
index 0000000..8da85a3
--- /dev/null
+++ b/test/test_config.py
@@ -0,0 +1,227 @@
+#!/usr/bin/env python3
+
+# Allow direct execution
+import os
+import sys
+import unittest
+import unittest.mock
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import contextlib
+import itertools
+from pathlib import Path
+
+from hypervideo_dl.compat import compat_expanduser
+from hypervideo_dl.options import create_parser, parseOpts
+from hypervideo_dl.utils import Config, get_executable_path
+
+ENVIRON_DEFAULTS = {
+ 'HOME': None,
+ 'XDG_CONFIG_HOME': '/_xdg_config_home/',
+ 'USERPROFILE': 'C:/Users/testing/',
+ 'APPDATA': 'C:/Users/testing/AppData/Roaming/',
+ 'HOMEDRIVE': 'C:/',
+ 'HOMEPATH': 'Users/testing/',
+}
+
+
+@contextlib.contextmanager
+def set_environ(**kwargs):
+ saved_environ = os.environ.copy()
+
+ for name, value in {**ENVIRON_DEFAULTS, **kwargs}.items():
+ if value is None:
+ os.environ.pop(name, None)
+ else:
+ os.environ[name] = value
+
+ yield
+
+ os.environ.clear()
+ os.environ.update(saved_environ)
+
+
+def _generate_expected_groups():
+ xdg_config_home = os.getenv('XDG_CONFIG_HOME') or compat_expanduser('~/.config')
+ appdata_dir = os.getenv('appdata')
+ home_dir = compat_expanduser('~')
+ return {
+ 'Portable': [
+ Path(get_executable_path(), 'hypervideo.conf'),
+ ],
+ 'Home': [
+ Path('hypervideo.conf'),
+ ],
+ 'User': [
+ Path(xdg_config_home, 'hypervideo.conf'),
+ Path(xdg_config_home, 'hypervideo', 'config'),
+ Path(xdg_config_home, 'hypervideo', 'config.txt'),
+ *((
+ Path(appdata_dir, 'hypervideo.conf'),
+ Path(appdata_dir, 'hypervideo', 'config'),
+ Path(appdata_dir, 'hypervideo', 'config.txt'),
+ ) if appdata_dir else ()),
+ Path(home_dir, 'hypervideo.conf'),
+ Path(home_dir, 'hypervideo.conf.txt'),
+ Path(home_dir, '.hypervideo', 'config'),
+ Path(home_dir, '.hypervideo', 'config.txt'),
+ ],
+ 'System': [
+ Path('/etc/hypervideo.conf'),
+ Path('/etc/hypervideo/config'),
+ Path('/etc/hypervideo/config.txt'),
+ ]
+ }
+
+
+class TestConfig(unittest.TestCase):
+ maxDiff = None
+
+ @set_environ()
+ def test_config__ENVIRON_DEFAULTS_sanity(self):
+ expected = make_expected()
+ self.assertCountEqual(
+ set(expected), expected,
+ 'ENVIRON_DEFAULTS produces non unique names')
+
+ def test_config_all_environ_values(self):
+ for name, value in ENVIRON_DEFAULTS.items():
+ for new_value in (None, '', '.', value or '/some/dir'):
+ with set_environ(**{name: new_value}):
+ self._simple_grouping_test()
+
+ def test_config_default_expected_locations(self):
+ files, _ = self._simple_config_test()
+ self.assertEqual(
+ files, make_expected(),
+ 'Not all expected locations have been checked')
+
+ def test_config_default_grouping(self):
+ self._simple_grouping_test()
+
+ def _simple_grouping_test(self):
+ expected_groups = make_expected_groups()
+ for name, group in expected_groups.items():
+ for index, existing_path in enumerate(group):
+ result, opts = self._simple_config_test(existing_path)
+ expected = expected_from_expected_groups(expected_groups, existing_path)
+ self.assertEqual(
+ result, expected,
+ f'The checked locations do not match the expected ({name}, {index})')
+ self.assertEqual(
+ opts.outtmpl['default'], '1',
+ f'The used result value was incorrect ({name}, {index})')
+
+ def _simple_config_test(self, *stop_paths):
+ encountered = 0
+ paths = []
+
+ def read_file(filename, default=[]):
+ nonlocal encountered
+ path = Path(filename)
+ paths.append(path)
+ if path in stop_paths:
+ encountered += 1
+ return ['-o', f'{encountered}']
+
+ with ConfigMock(read_file):
+ _, opts, _ = parseOpts([], False)
+
+ return paths, opts
+
+ @set_environ()
+ def test_config_early_exit_commandline(self):
+ self._early_exit_test(0, '--ignore-config')
+
+ @set_environ()
+ def test_config_early_exit_files(self):
+ for index, _ in enumerate(make_expected(), 1):
+ self._early_exit_test(index)
+
+ def _early_exit_test(self, allowed_reads, *args):
+ reads = 0
+
+ def read_file(filename, default=[]):
+ nonlocal reads
+ reads += 1
+
+ if reads > allowed_reads:
+ self.fail('The remaining config was not ignored')
+ elif reads == allowed_reads:
+ return ['--ignore-config']
+
+ with ConfigMock(read_file):
+ parseOpts(args, False)
+
+ @set_environ()
+ def test_config_override_commandline(self):
+ self._override_test(0, '-o', 'pass')
+
+ @set_environ()
+ def test_config_override_files(self):
+ for index, _ in enumerate(make_expected(), 1):
+ self._override_test(index)
+
+ def _override_test(self, start_index, *args):
+ index = 0
+
+ def read_file(filename, default=[]):
+ nonlocal index
+ index += 1
+
+ if index > start_index:
+ return ['-o', 'fail']
+ elif index == start_index:
+ return ['-o', 'pass']
+
+ with ConfigMock(read_file):
+ _, opts, _ = parseOpts(args, False)
+
+ self.assertEqual(
+ opts.outtmpl['default'], 'pass',
+ 'The earlier group did not override the later ones')
+
+
+@contextlib.contextmanager
+def ConfigMock(read_file=None):
+ with unittest.mock.patch('hypervideo_dl.options.Config') as mock:
+ mock.return_value = Config(create_parser())
+ if read_file is not None:
+ mock.read_file = read_file
+
+ yield mock
+
+
+def make_expected(*filepaths):
+ return expected_from_expected_groups(_generate_expected_groups(), *filepaths)
+
+
+def make_expected_groups(*filepaths):
+ return _filter_expected_groups(_generate_expected_groups(), filepaths)
+
+
+def expected_from_expected_groups(expected_groups, *filepaths):
+ return list(itertools.chain.from_iterable(
+ _filter_expected_groups(expected_groups, filepaths).values()))
+
+
+def _filter_expected_groups(expected, filepaths):
+ if not filepaths:
+ return expected
+
+ result = {}
+ for group, paths in expected.items():
+ new_paths = []
+ for path in paths:
+ new_paths.append(path)
+ if path in filepaths:
+ break
+
+ result[group] = new_paths
+
+ return result
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_cookies.py b/test/test_cookies.py
index ab5dd02..46369ca 100644
--- a/test/test_cookies.py
+++ b/test/test_cookies.py
@@ -49,32 +49,38 @@ class TestCookies(unittest.TestCase):
""" based on https://chromium.googlesource.com/chromium/src/+/refs/heads/main/base/nix/xdg_util_unittest.cc """
test_cases = [
({}, _LinuxDesktopEnvironment.OTHER),
+ ({'DESKTOP_SESSION': 'my_custom_de'}, _LinuxDesktopEnvironment.OTHER),
+ ({'XDG_CURRENT_DESKTOP': 'my_custom_de'}, _LinuxDesktopEnvironment.OTHER),
({'DESKTOP_SESSION': 'gnome'}, _LinuxDesktopEnvironment.GNOME),
({'DESKTOP_SESSION': 'mate'}, _LinuxDesktopEnvironment.GNOME),
- ({'DESKTOP_SESSION': 'kde4'}, _LinuxDesktopEnvironment.KDE),
- ({'DESKTOP_SESSION': 'kde'}, _LinuxDesktopEnvironment.KDE),
+ ({'DESKTOP_SESSION': 'kde4'}, _LinuxDesktopEnvironment.KDE4),
+ ({'DESKTOP_SESSION': 'kde'}, _LinuxDesktopEnvironment.KDE3),
({'DESKTOP_SESSION': 'xfce'}, _LinuxDesktopEnvironment.XFCE),
({'GNOME_DESKTOP_SESSION_ID': 1}, _LinuxDesktopEnvironment.GNOME),
- ({'KDE_FULL_SESSION': 1}, _LinuxDesktopEnvironment.KDE),
+ ({'KDE_FULL_SESSION': 1}, _LinuxDesktopEnvironment.KDE3),
+ ({'KDE_FULL_SESSION': 1, 'DESKTOP_SESSION': 'kde4'}, _LinuxDesktopEnvironment.KDE4),
({'XDG_CURRENT_DESKTOP': 'X-Cinnamon'}, _LinuxDesktopEnvironment.CINNAMON),
+ ({'XDG_CURRENT_DESKTOP': 'Deepin'}, _LinuxDesktopEnvironment.DEEPIN),
({'XDG_CURRENT_DESKTOP': 'GNOME'}, _LinuxDesktopEnvironment.GNOME),
({'XDG_CURRENT_DESKTOP': 'GNOME:GNOME-Classic'}, _LinuxDesktopEnvironment.GNOME),
({'XDG_CURRENT_DESKTOP': 'GNOME : GNOME-Classic'}, _LinuxDesktopEnvironment.GNOME),
({'XDG_CURRENT_DESKTOP': 'Unity', 'DESKTOP_SESSION': 'gnome-fallback'}, _LinuxDesktopEnvironment.GNOME),
- ({'XDG_CURRENT_DESKTOP': 'KDE', 'KDE_SESSION_VERSION': '5'}, _LinuxDesktopEnvironment.KDE),
- ({'XDG_CURRENT_DESKTOP': 'KDE'}, _LinuxDesktopEnvironment.KDE),
+ ({'XDG_CURRENT_DESKTOP': 'KDE', 'KDE_SESSION_VERSION': '5'}, _LinuxDesktopEnvironment.KDE5),
+ ({'XDG_CURRENT_DESKTOP': 'KDE', 'KDE_SESSION_VERSION': '6'}, _LinuxDesktopEnvironment.KDE6),
+ ({'XDG_CURRENT_DESKTOP': 'KDE'}, _LinuxDesktopEnvironment.KDE4),
({'XDG_CURRENT_DESKTOP': 'Pantheon'}, _LinuxDesktopEnvironment.PANTHEON),
+ ({'XDG_CURRENT_DESKTOP': 'UKUI'}, _LinuxDesktopEnvironment.UKUI),
({'XDG_CURRENT_DESKTOP': 'Unity'}, _LinuxDesktopEnvironment.UNITY),
({'XDG_CURRENT_DESKTOP': 'Unity:Unity7'}, _LinuxDesktopEnvironment.UNITY),
({'XDG_CURRENT_DESKTOP': 'Unity:Unity8'}, _LinuxDesktopEnvironment.UNITY),
]
for env, expected_desktop_environment in test_cases:
- self.assertEqual(_get_linux_desktop_environment(env), expected_desktop_environment)
+ self.assertEqual(_get_linux_desktop_environment(env, Logger()), expected_desktop_environment)
def test_chrome_cookie_decryptor_linux_derive_key(self):
key = LinuxChromeCookieDecryptor.derive_key(b'abc')
diff --git a/test/test_download.py b/test/test_download.py
index 6f77343..7c05413 100755
--- a/test/test_download.py
+++ b/test/test_download.py
@@ -10,10 +10,7 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import collections
import hashlib
-import http.client
import json
-import socket
-import urllib.error
from test.helper import (
assertGreaterEqual,
@@ -29,6 +26,7 @@ from test.helper import (
import hypervideo_dl.YoutubeDL # isort: split
from hypervideo_dl.extractor import get_info_extractor
+from hypervideo_dl.networking.exceptions import HTTPError, TransportError
from hypervideo_dl.utils import (
DownloadError,
ExtractorError,
@@ -162,8 +160,7 @@ def generator(test_case, tname):
force_generic_extractor=params.get('force_generic_extractor', False))
except (DownloadError, ExtractorError) as err:
# Check if the exception is not a network related one
- if (err.exc_info[0] not in (urllib.error.URLError, socket.timeout, UnavailableVideoError, http.client.BadStatusLine)
- or (err.exc_info[0] == urllib.error.HTTPError and err.exc_info[1].code == 503)):
+ if not isinstance(err.exc_info[1], (TransportError, UnavailableVideoError)) or (isinstance(err.exc_info[1], HTTPError) and err.exc_info[1].status == 503):
err.msg = f'{getattr(err, "msg", err)} ({tname})'
raise
@@ -249,7 +246,7 @@ def generator(test_case, tname):
# extractor returns full results even with extract_flat
res_tcs = [{'info_dict': e} for e in res_dict['entries']]
try_rm_tcs_files(res_tcs)
-
+ ydl.close()
return test_template
diff --git a/test/test_downloader_external.py b/test/test_downloader_external.py
new file mode 100644
index 0000000..3200e74
--- /dev/null
+++ b/test/test_downloader_external.py
@@ -0,0 +1,139 @@
+#!/usr/bin/env python3
+
+# Allow direct execution
+import os
+import sys
+import unittest
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import http.cookiejar
+
+from test.helper import FakeYDL
+from hypervideo_dl.downloader.external import (
+ Aria2cFD,
+ AxelFD,
+ CurlFD,
+ FFmpegFD,
+ HttpieFD,
+ WgetFD,
+)
+
+TEST_COOKIE = {
+ 'version': 0,
+ 'name': 'test',
+ 'value': 'ytdlp',
+ 'port': None,
+ 'port_specified': False,
+ 'domain': '.example.com',
+ 'domain_specified': True,
+ 'domain_initial_dot': False,
+ 'path': '/',
+ 'path_specified': True,
+ 'secure': False,
+ 'expires': None,
+ 'discard': False,
+ 'comment': None,
+ 'comment_url': None,
+ 'rest': {},
+}
+
+TEST_INFO = {'url': 'http://www.example.com/'}
+
+
+class TestHttpieFD(unittest.TestCase):
+ def test_make_cmd(self):
+ with FakeYDL() as ydl:
+ downloader = HttpieFD(ydl, {})
+ self.assertEqual(
+ downloader._make_cmd('test', TEST_INFO),
+ ['http', '--download', '--output', 'test', 'http://www.example.com/'])
+
+ # Test cookie header is added
+ ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+ self.assertEqual(
+ downloader._make_cmd('test', TEST_INFO),
+ ['http', '--download', '--output', 'test', 'http://www.example.com/', 'Cookie:test=ytdlp'])
+
+
+class TestAxelFD(unittest.TestCase):
+ def test_make_cmd(self):
+ with FakeYDL() as ydl:
+ downloader = AxelFD(ydl, {})
+ self.assertEqual(
+ downloader._make_cmd('test', TEST_INFO),
+ ['axel', '-o', 'test', '--', 'http://www.example.com/'])
+
+ # Test cookie header is added
+ ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+ self.assertEqual(
+ downloader._make_cmd('test', TEST_INFO),
+ ['axel', '-o', 'test', '-H', 'Cookie: test=ytdlp', '--max-redirect=0', '--', 'http://www.example.com/'])
+
+
+class TestWgetFD(unittest.TestCase):
+ def test_make_cmd(self):
+ with FakeYDL() as ydl:
+ downloader = WgetFD(ydl, {})
+ self.assertNotIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
+ # Test cookiejar tempfile arg is added
+ ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+ self.assertIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
+
+
+class TestCurlFD(unittest.TestCase):
+ def test_make_cmd(self):
+ with FakeYDL() as ydl:
+ downloader = CurlFD(ydl, {})
+ self.assertNotIn('--cookie', downloader._make_cmd('test', TEST_INFO))
+ # Test cookie header is added
+ ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+ self.assertIn('--cookie', downloader._make_cmd('test', TEST_INFO))
+ self.assertIn('test=ytdlp', downloader._make_cmd('test', TEST_INFO))
+
+
+class TestAria2cFD(unittest.TestCase):
+ def test_make_cmd(self):
+ with FakeYDL() as ydl:
+ downloader = Aria2cFD(ydl, {})
+ downloader._make_cmd('test', TEST_INFO)
+ self.assertFalse(hasattr(downloader, '_cookies_tempfile'))
+
+ # Test cookiejar tempfile arg is added
+ ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+ cmd = downloader._make_cmd('test', TEST_INFO)
+ self.assertIn(f'--load-cookies={downloader._cookies_tempfile}', cmd)
+
+
+@unittest.skipUnless(FFmpegFD.available(), 'ffmpeg not found')
+class TestFFmpegFD(unittest.TestCase):
+ _args = []
+
+ def _test_cmd(self, args):
+ self._args = args
+
+ def test_make_cmd(self):
+ with FakeYDL() as ydl:
+ downloader = FFmpegFD(ydl, {})
+ downloader._debug_cmd = self._test_cmd
+
+ downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'})
+ self.assertEqual(self._args, [
+ 'ffmpeg', '-y', '-hide_banner', '-i', 'http://www.example.com/',
+ '-c', 'copy', '-f', 'mp4', 'file:test'])
+
+ # Test cookies arg is added
+ ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+ downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'})
+ self.assertEqual(self._args, [
+ 'ffmpeg', '-y', '-hide_banner', '-cookies', 'test=ytdlp; path=/; domain=.example.com;\r\n',
+ '-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test'])
+
+ # Test with non-url input (ffmpeg reads from stdin '-' for websockets)
+ downloader._call_downloader('test', {'url': 'x', 'ext': 'mp4'})
+ self.assertEqual(self._args, [
+ 'ffmpeg', '-y', '-hide_banner', '-i', 'x', '-c', 'copy', '-f', 'mp4', 'file:test'])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_downloader_http.py b/test/test_downloader_http.py
index 3b65859..a422141 100644
--- a/test/test_downloader_http.py
+++ b/test/test_downloader_http.py
@@ -16,6 +16,7 @@ from test.helper import http_server_port, try_rm
from hypervideo_dl import YoutubeDL
from hypervideo_dl.downloader.http import HttpFD
from hypervideo_dl.utils import encodeFilename
+from hypervideo_dl.utils._utils import _YDLLogger as FakeLogger
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
@@ -67,17 +68,6 @@ class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
assert False
-class FakeLogger:
- def debug(self, msg):
- pass
-
- def warning(self, msg):
- pass
-
- def error(self, msg):
- pass
-
-
class TestHttpFD(unittest.TestCase):
def setUp(self):
self.httpd = http.server.HTTPServer(
diff --git a/test/test_networking.py b/test/test_networking.py
new file mode 100644
index 0000000..ca7ecf0
--- /dev/null
+++ b/test/test_networking.py
@@ -0,0 +1,1439 @@
+#!/usr/bin/env python3
+
+# Allow direct execution
+import os
+import sys
+
+import pytest
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import gzip
+import http.client
+import http.cookiejar
+import http.server
+import io
+import pathlib
+import random
+import ssl
+import tempfile
+import threading
+import time
+import urllib.error
+import urllib.request
+import warnings
+import zlib
+from email.message import Message
+from http.cookiejar import CookieJar
+
+from test.helper import FakeYDL, http_server_port
+from hypervideo_dl.cookies import YoutubeDLCookieJar
+from hypervideo_dl.dependencies import brotli
+from hypervideo_dl.networking import (
+ HEADRequest,
+ PUTRequest,
+ Request,
+ RequestDirector,
+ RequestHandler,
+ Response,
+)
+from hypervideo_dl.networking._urllib import UrllibRH
+from hypervideo_dl.networking.exceptions import (
+ CertificateVerifyError,
+ HTTPError,
+ IncompleteRead,
+ NoSupportingHandlers,
+ RequestError,
+ SSLError,
+ TransportError,
+ UnsupportedRequest,
+)
+from hypervideo_dl.utils._utils import _YDLLogger as FakeLogger
+from hypervideo_dl.utils.networking import HTTPHeaderDict
+
+TEST_DIR = os.path.dirname(os.path.abspath(__file__))
+
+
+def _build_proxy_handler(name):
+ class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
+ proxy_name = name
+
+ def log_message(self, format, *args):
+ pass
+
+ def do_GET(self):
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/plain; charset=utf-8')
+ self.end_headers()
+ self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode())
+ return HTTPTestRequestHandler
+
+
+class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
+ protocol_version = 'HTTP/1.1'
+
+ def log_message(self, format, *args):
+ pass
+
+ def _headers(self):
+ payload = str(self.headers).encode()
+ self.send_response(200)
+ self.send_header('Content-Type', 'application/json')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+
+ def _redirect(self):
+ self.send_response(int(self.path[len('/redirect_'):]))
+ self.send_header('Location', '/method')
+ self.send_header('Content-Length', '0')
+ self.end_headers()
+
+ def _method(self, method, payload=None):
+ self.send_response(200)
+ self.send_header('Content-Length', str(len(payload or '')))
+ self.send_header('Method', method)
+ self.end_headers()
+ if payload:
+ self.wfile.write(payload)
+
+ def _status(self, status):
+ payload = f'<html>{status} NOT FOUND</html>'.encode()
+ self.send_response(int(status))
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+
+ def _read_data(self):
+ if 'Content-Length' in self.headers:
+ return self.rfile.read(int(self.headers['Content-Length']))
+
+ def do_POST(self):
+ data = self._read_data() + str(self.headers).encode()
+ if self.path.startswith('/redirect_'):
+ self._redirect()
+ elif self.path.startswith('/method'):
+ self._method('POST', data)
+ elif self.path.startswith('/headers'):
+ self._headers()
+ else:
+ self._status(404)
+
+ def do_HEAD(self):
+ if self.path.startswith('/redirect_'):
+ self._redirect()
+ elif self.path.startswith('/method'):
+ self._method('HEAD')
+ else:
+ self._status(404)
+
+ def do_PUT(self):
+ data = self._read_data() + str(self.headers).encode()
+ if self.path.startswith('/redirect_'):
+ self._redirect()
+ elif self.path.startswith('/method'):
+ self._method('PUT', data)
+ else:
+ self._status(404)
+
+ def do_GET(self):
+ if self.path == '/video.html':
+ payload = b'<html><video src="/vid.mp4" /></html>'
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ elif self.path == '/vid.mp4':
+ payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
+ self.send_response(200)
+ self.send_header('Content-Type', 'video/mp4')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ elif self.path == '/%E4%B8%AD%E6%96%87.html':
+ payload = b'<html><video src="/vid.mp4" /></html>'
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ elif self.path == '/%c7%9f':
+ payload = b'<html><video src="/vid.mp4" /></html>'
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ elif self.path.startswith('/redirect_loop'):
+ self.send_response(301)
+ self.send_header('Location', self.path)
+ self.send_header('Content-Length', '0')
+ self.end_headers()
+ elif self.path == '/redirect_dotsegments':
+ self.send_response(301)
+ # redirect to /headers but with dot segments before
+ self.send_header('Location', '/a/b/./../../headers')
+ self.send_header('Content-Length', '0')
+ self.end_headers()
+ elif self.path.startswith('/redirect_'):
+ self._redirect()
+ elif self.path.startswith('/method'):
+ self._method('GET', str(self.headers).encode())
+ elif self.path.startswith('/headers'):
+ self._headers()
+ elif self.path.startswith('/308-to-headers'):
+ self.send_response(308)
+ self.send_header('Location', '/headers')
+ self.send_header('Content-Length', '0')
+ self.end_headers()
+ elif self.path == '/trailing_garbage':
+ payload = b'<html><video src="/vid.mp4" /></html>'
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Encoding', 'gzip')
+ buf = io.BytesIO()
+ with gzip.GzipFile(fileobj=buf, mode='wb') as f:
+ f.write(payload)
+ compressed = buf.getvalue() + b'trailing garbage'
+ self.send_header('Content-Length', str(len(compressed)))
+ self.end_headers()
+ self.wfile.write(compressed)
+ elif self.path == '/302-non-ascii-redirect':
+ new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
+ self.send_response(301)
+ self.send_header('Location', new_url)
+ self.send_header('Content-Length', '0')
+ self.end_headers()
+ elif self.path == '/content-encoding':
+ encodings = self.headers.get('ytdl-encoding', '')
+ payload = b'<html><video src="/vid.mp4" /></html>'
+ for encoding in filter(None, (e.strip() for e in encodings.split(','))):
+ if encoding == 'br' and brotli:
+ payload = brotli.compress(payload)
+ elif encoding == 'gzip':
+ buf = io.BytesIO()
+ with gzip.GzipFile(fileobj=buf, mode='wb') as f:
+ f.write(payload)
+ payload = buf.getvalue()
+ elif encoding == 'deflate':
+ payload = zlib.compress(payload)
+ elif encoding == 'unsupported':
+ payload = b'raw'
+ break
+ else:
+ self._status(415)
+ return
+ self.send_response(200)
+ self.send_header('Content-Encoding', encodings)
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ elif self.path.startswith('/gen_'):
+ payload = b'<html></html>'
+ self.send_response(int(self.path[len('/gen_'):]))
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ elif self.path.startswith('/incompleteread'):
+ payload = b'<html></html>'
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', '234234')
+ self.end_headers()
+ self.wfile.write(payload)
+ self.finish()
+ elif self.path.startswith('/timeout_'):
+ time.sleep(int(self.path[len('/timeout_'):]))
+ self._headers()
+ elif self.path == '/source_address':
+ payload = str(self.client_address[0]).encode()
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ self.finish()
+ else:
+ self._status(404)
+
+ def send_header(self, keyword, value):
+ """
+ Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
+ This is against what is defined in RFC 3986, however we need to test we support this
+ since some sites incorrectly do this.
+ """
+ if keyword.lower() == 'connection':
+ return super().send_header(keyword, value)
+
+ if not hasattr(self, '_headers_buffer'):
+ self._headers_buffer = []
+
+ self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
+
+
+def validate_and_send(rh, req):
+ rh.validate(req)
+ return rh.send(req)
+
+
+class TestRequestHandlerBase:
+ @classmethod
+ def setup_class(cls):
+ cls.http_httpd = http.server.ThreadingHTTPServer(
+ ('127.0.0.1', 0), HTTPTestRequestHandler)
+ cls.http_port = http_server_port(cls.http_httpd)
+ cls.http_server_thread = threading.Thread(target=cls.http_httpd.serve_forever)
+ # FIXME: we should probably stop the http server thread after each test
+ # See: https://github.com/hypervideo/hypervideo/pull/7094#discussion_r1199746041
+ cls.http_server_thread.daemon = True
+ cls.http_server_thread.start()
+
+ # HTTPS server
+ certfn = os.path.join(TEST_DIR, 'testcert.pem')
+ cls.https_httpd = http.server.ThreadingHTTPServer(
+ ('127.0.0.1', 0), HTTPTestRequestHandler)
+ sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ sslctx.load_cert_chain(certfn, None)
+ cls.https_httpd.socket = sslctx.wrap_socket(cls.https_httpd.socket, server_side=True)
+ cls.https_port = http_server_port(cls.https_httpd)
+ cls.https_server_thread = threading.Thread(target=cls.https_httpd.serve_forever)
+ cls.https_server_thread.daemon = True
+ cls.https_server_thread.start()
+
+
+class TestHTTPRequestHandler(TestRequestHandlerBase):
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_verify_cert(self, handler):
+ with handler() as rh:
+ with pytest.raises(CertificateVerifyError):
+ validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
+
+ with handler(verify=False) as rh:
+ r = validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
+ assert r.status == 200
+ r.close()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_ssl_error(self, handler):
+ # HTTPS server with too old TLS version
+ # XXX: is there a better way to test this than to create a new server?
+ https_httpd = http.server.ThreadingHTTPServer(
+ ('127.0.0.1', 0), HTTPTestRequestHandler)
+ sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
+ https_port = http_server_port(https_httpd)
+ https_server_thread = threading.Thread(target=https_httpd.serve_forever)
+ https_server_thread.daemon = True
+ https_server_thread.start()
+
+ with handler(verify=False) as rh:
+ with pytest.raises(SSLError, match='sslv3 alert handshake failure') as exc_info:
+ validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
+ assert not issubclass(exc_info.type, CertificateVerifyError)
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_percent_encode(self, handler):
+ with handler() as rh:
+ # Unicode characters should be encoded with uppercase percent-encoding
+ res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
+ assert res.status == 200
+ res.close()
+ # don't normalize existing percent encodings
+ res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
+ assert res.status == 200
+ res.close()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_remove_dot_segments(self, handler):
+ with handler() as rh:
+ # This isn't a comprehensive test,
+ # but it should be enough to check whether the handler is removing dot segments
+ res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/a/b/./../../headers'))
+ assert res.status == 200
+ assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
+ res.close()
+
+ res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_dotsegments'))
+ assert res.status == 200
+ assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
+ res.close()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_unicode_path_redirection(self, handler):
+ with handler() as rh:
+ r = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
+ assert r.url == f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html'
+ r.close()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_raise_http_error(self, handler):
+ with handler() as rh:
+ for bad_status in (400, 500, 599, 302):
+ with pytest.raises(HTTPError):
+ validate_and_send(rh, Request('http://127.0.0.1:%d/gen_%d' % (self.http_port, bad_status)))
+
+ # Should not raise an error
+ validate_and_send(rh, Request('http://127.0.0.1:%d/gen_200' % self.http_port)).close()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_response_url(self, handler):
+ with handler() as rh:
+ # Response url should be that of the last url in redirect chain
+ res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_301'))
+ assert res.url == f'http://127.0.0.1:{self.http_port}/method'
+ res.close()
+ res2 = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200'))
+ assert res2.url == f'http://127.0.0.1:{self.http_port}/gen_200'
+ res2.close()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_redirect(self, handler):
+ with handler() as rh:
+ def do_req(redirect_status, method, assert_no_content=False):
+ data = b'testdata' if method in ('POST', 'PUT') else None
+ res = validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data))
+
+ headers = b''
+ data_sent = b''
+ if data is not None:
+ data_sent += res.read(len(data))
+ if data_sent != data:
+ headers += data_sent
+ data_sent = b''
+
+ headers += res.read()
+
+ if assert_no_content or data is None:
+ assert b'Content-Type' not in headers
+ assert b'Content-Length' not in headers
+ else:
+ assert b'Content-Type' in headers
+ assert b'Content-Length' in headers
+
+ return data_sent.decode(), res.headers.get('method', '')
+
+ # A 303 must either use GET or HEAD for subsequent request
+ assert do_req(303, 'POST', True) == ('', 'GET')
+ assert do_req(303, 'HEAD') == ('', 'HEAD')
+
+ assert do_req(303, 'PUT', True) == ('', 'GET')
+
+ # 301 and 302 turn POST only into a GET
+ assert do_req(301, 'POST', True) == ('', 'GET')
+ assert do_req(301, 'HEAD') == ('', 'HEAD')
+ assert do_req(302, 'POST', True) == ('', 'GET')
+ assert do_req(302, 'HEAD') == ('', 'HEAD')
+
+ assert do_req(301, 'PUT') == ('testdata', 'PUT')
+ assert do_req(302, 'PUT') == ('testdata', 'PUT')
+
+ # 307 and 308 should not change method
+ for m in ('POST', 'PUT'):
+ assert do_req(307, m) == ('testdata', m)
+ assert do_req(308, m) == ('testdata', m)
+
+ assert do_req(307, 'HEAD') == ('', 'HEAD')
+ assert do_req(308, 'HEAD') == ('', 'HEAD')
+
+ # These should not redirect and instead raise an HTTPError
+ for code in (300, 304, 305, 306):
+ with pytest.raises(HTTPError):
+ do_req(code, 'GET')
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_request_cookie_header(self, handler):
+ # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
+ with handler() as rh:
+ # Specified Cookie header should be used
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/headers',
+ headers={'Cookie': 'test=test'})).read().decode()
+ assert 'Cookie: test=test' in res
+
+ # Specified Cookie header should be removed on any redirect
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/308-to-headers',
+ headers={'Cookie': 'test=test'})).read().decode()
+ assert 'Cookie: test=test' not in res
+
+ # Specified Cookie header should override global cookiejar for that request
+ cookiejar = YoutubeDLCookieJar()
+ cookiejar.set_cookie(http.cookiejar.Cookie(
+ version=0, name='test', value='ytdlp', port=None, port_specified=False,
+ domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
+ path_specified=True, secure=False, expires=None, discard=False, comment=None,
+ comment_url=None, rest={}))
+
+ with handler(cookiejar=cookiejar) as rh:
+ data = validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'cookie': 'test=test'})).read()
+ assert b'Cookie: test=ytdlp' not in data
+ assert b'Cookie: test=test' in data
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_redirect_loop(self, handler):
+ with handler() as rh:
+ with pytest.raises(HTTPError, match='redirect loop'):
+ validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_loop'))
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_incompleteread(self, handler):
+ with handler(timeout=2) as rh:
+ with pytest.raises(IncompleteRead):
+ validate_and_send(rh, Request('http://127.0.0.1:%d/incompleteread' % self.http_port)).read()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_cookies(self, handler):
+ cookiejar = YoutubeDLCookieJar()
+ cookiejar.set_cookie(http.cookiejar.Cookie(
+ 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
+ False, '/headers', True, False, None, False, None, None, {}))
+
+ with handler(cookiejar=cookiejar) as rh:
+ data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
+ assert b'Cookie: test=ytdlp' in data
+
+ # Per request
+ with handler() as rh:
+ data = validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': cookiejar})).read()
+ assert b'Cookie: test=ytdlp' in data
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_headers(self, handler):
+
+ with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
+ # Global Headers
+ data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
+ assert b'Test1: test' in data
+
+ # Per request headers, merged with global
+ data = validate_and_send(rh, Request(
+ f'http://127.0.0.1:{self.http_port}/headers', headers={'test2': 'changed', 'test3': 'test3'})).read()
+ assert b'Test1: test' in data
+ assert b'Test2: changed' in data
+ assert b'Test2: test2' not in data
+ assert b'Test3: test3' in data
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_timeout(self, handler):
+ with handler() as rh:
+ # Default timeout is 20 seconds, so this should go through
+ validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_3'))
+
+ with handler(timeout=0.5) as rh:
+ with pytest.raises(TransportError):
+ validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
+
+ # Per request timeout, should override handler timeout
+ validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1', extensions={'timeout': 4}))
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_source_address(self, handler):
+ source_address = f'127.0.0.{random.randint(5, 255)}'
+ with handler(source_address=source_address) as rh:
+ data = validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/source_address')).read().decode()
+ assert source_address == data
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_gzip_trailing_garbage(self, handler):
+ with handler() as rh:
+ data = validate_and_send(rh, Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode()
+ assert data == '<html><video src="/vid.mp4" /></html>'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ @pytest.mark.skipif(not brotli, reason='brotli support is not installed')
+ def test_brotli(self, handler):
+ with handler() as rh:
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': 'br'}))
+ assert res.headers.get('Content-Encoding') == 'br'
+ assert res.read() == b'<html><video src="/vid.mp4" /></html>'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_deflate(self, handler):
+ with handler() as rh:
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': 'deflate'}))
+ assert res.headers.get('Content-Encoding') == 'deflate'
+ assert res.read() == b'<html><video src="/vid.mp4" /></html>'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_gzip(self, handler):
+ with handler() as rh:
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': 'gzip'}))
+ assert res.headers.get('Content-Encoding') == 'gzip'
+ assert res.read() == b'<html><video src="/vid.mp4" /></html>'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_multiple_encodings(self, handler):
+ with handler() as rh:
+ for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': pair}))
+ assert res.headers.get('Content-Encoding') == pair
+ assert res.read() == b'<html><video src="/vid.mp4" /></html>'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_unsupported_encoding(self, handler):
+ with handler() as rh:
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': 'unsupported'}))
+ assert res.headers.get('Content-Encoding') == 'unsupported'
+ assert res.read() == b'raw'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_read(self, handler):
+ with handler() as rh:
+ res = validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/headers'))
+ assert res.readable()
+ assert res.read(1) == b'H'
+ assert res.read(3) == b'ost'
+
+
+class TestHTTPProxy(TestRequestHandlerBase):
+ @classmethod
+ def setup_class(cls):
+ super().setup_class()
+ # HTTP Proxy server
+ cls.proxy = http.server.ThreadingHTTPServer(
+ ('127.0.0.1', 0), _build_proxy_handler('normal'))
+ cls.proxy_port = http_server_port(cls.proxy)
+ cls.proxy_thread = threading.Thread(target=cls.proxy.serve_forever)
+ cls.proxy_thread.daemon = True
+ cls.proxy_thread.start()
+
+ # Geo proxy server
+ cls.geo_proxy = http.server.ThreadingHTTPServer(
+ ('127.0.0.1', 0), _build_proxy_handler('geo'))
+ cls.geo_port = http_server_port(cls.geo_proxy)
+ cls.geo_proxy_thread = threading.Thread(target=cls.geo_proxy.serve_forever)
+ cls.geo_proxy_thread.daemon = True
+ cls.geo_proxy_thread.start()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_http_proxy(self, handler):
+ http_proxy = f'http://127.0.0.1:{self.proxy_port}'
+ geo_proxy = f'http://127.0.0.1:{self.geo_port}'
+
+ # Test global http proxy
+ # Test per request http proxy
+ # Test per request http proxy disables proxy
+ url = 'http://foo.com/bar'
+
+ # Global HTTP proxy
+ with handler(proxies={'http': http_proxy}) as rh:
+ res = validate_and_send(rh, Request(url)).read().decode()
+ assert res == f'normal: {url}'
+
+ # Per request proxy overrides global
+ res = validate_and_send(rh, Request(url, proxies={'http': geo_proxy})).read().decode()
+ assert res == f'geo: {url}'
+
+ # and setting to None disables all proxies for that request
+ real_url = f'http://127.0.0.1:{self.http_port}/headers'
+ res = validate_and_send(
+ rh, Request(real_url, proxies={'http': None})).read().decode()
+ assert res != f'normal: {real_url}'
+ assert 'Accept' in res
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_noproxy(self, handler):
+ with handler(proxies={'proxy': f'http://127.0.0.1:{self.proxy_port}'}) as rh:
+ # NO_PROXY
+ for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
+ nop_response = validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy})).read().decode(
+ 'utf-8')
+ assert 'Accept' in nop_response
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_allproxy(self, handler):
+ url = 'http://foo.com/bar'
+ with handler() as rh:
+ response = validate_and_send(rh, Request(url, proxies={'all': f'http://127.0.0.1:{self.proxy_port}'})).read().decode(
+ 'utf-8')
+ assert response == f'normal: {url}'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_http_proxy_with_idn(self, handler):
+ with handler(proxies={
+ 'http': f'http://127.0.0.1:{self.proxy_port}',
+ }) as rh:
+ url = 'http://中文.tw/'
+ response = rh.send(Request(url)).read().decode()
+ # b'xn--fiq228c' is '中文'.encode('idna')
+ assert response == 'normal: http://xn--fiq228c.tw/'
+
+
+class TestClientCertificate:
+
+ @classmethod
+ def setup_class(cls):
+ certfn = os.path.join(TEST_DIR, 'testcert.pem')
+ cls.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
+ cacertfn = os.path.join(cls.certdir, 'ca.crt')
+ cls.httpd = http.server.ThreadingHTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
+ sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ sslctx.verify_mode = ssl.CERT_REQUIRED
+ sslctx.load_verify_locations(cafile=cacertfn)
+ sslctx.load_cert_chain(certfn, None)
+ cls.httpd.socket = sslctx.wrap_socket(cls.httpd.socket, server_side=True)
+ cls.port = http_server_port(cls.httpd)
+ cls.server_thread = threading.Thread(target=cls.httpd.serve_forever)
+ cls.server_thread.daemon = True
+ cls.server_thread.start()
+
+ def _run_test(self, handler, **handler_kwargs):
+ with handler(
+ # Disable client-side validation of unacceptable self-signed testcert.pem
+ # The test is of a check on the server side, so unaffected
+ verify=False,
+ **handler_kwargs,
+ ) as rh:
+ validate_and_send(rh, Request(f'https://127.0.0.1:{self.port}/video.html')).read().decode()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_certificate_combined_nopass(self, handler):
+ self._run_test(handler, client_cert={
+ 'client_certificate': os.path.join(self.certdir, 'clientwithkey.crt'),
+ })
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_certificate_nocombined_nopass(self, handler):
+ self._run_test(handler, client_cert={
+ 'client_certificate': os.path.join(self.certdir, 'client.crt'),
+ 'client_certificate_key': os.path.join(self.certdir, 'client.key'),
+ })
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_certificate_combined_pass(self, handler):
+ self._run_test(handler, client_cert={
+ 'client_certificate': os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
+ 'client_certificate_password': 'foobar',
+ })
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_certificate_nocombined_pass(self, handler):
+ self._run_test(handler, client_cert={
+ 'client_certificate': os.path.join(self.certdir, 'client.crt'),
+ 'client_certificate_key': os.path.join(self.certdir, 'clientencrypted.key'),
+ 'client_certificate_password': 'foobar',
+ })
+
+
+class TestUrllibRequestHandler(TestRequestHandlerBase):
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_file_urls(self, handler):
+ # See https://github.com/ytdl-org/youtube-dl/issues/8227
+ tf = tempfile.NamedTemporaryFile(delete=False)
+ tf.write(b'foobar')
+ tf.close()
+ req = Request(pathlib.Path(tf.name).as_uri())
+ with handler() as rh:
+ with pytest.raises(UnsupportedRequest):
+ rh.validate(req)
+
+ # Test that urllib never loaded FileHandler
+ with pytest.raises(TransportError):
+ rh.send(req)
+
+ with handler(enable_file_urls=True) as rh:
+ res = validate_and_send(rh, req)
+ assert res.read() == b'foobar'
+ res.close()
+
+ os.unlink(tf.name)
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_http_error_returns_content(self, handler):
+ # urllib HTTPError will try close the underlying response if reference to the HTTPError object is lost
+ def get_response():
+ with handler() as rh:
+ # headers url
+ try:
+ validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_404'))
+ except HTTPError as e:
+ return e.response
+
+ assert get_response().read() == b'<html></html>'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_verify_cert_error_text(self, handler):
+ # Check the output of the error message
+ with handler() as rh:
+ with pytest.raises(
+ CertificateVerifyError,
+ match=r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed: self.signed certificate'
+ ):
+ validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ @pytest.mark.parametrize('req,match,version_check', [
+ # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1256
+ # bpo-39603: Check implemented in 3.7.9+, 3.8.5+
+ (
+ Request('http://127.0.0.1', method='GET\n'),
+ 'method can\'t contain control characters',
+ lambda v: v < (3, 7, 9) or (3, 8, 0) <= v < (3, 8, 5)
+ ),
+ # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1265
+ # bpo-38576: Check implemented in 3.7.8+, 3.8.3+
+ (
+ Request('http://127.0.0. 1', method='GET'),
+ 'URL can\'t contain control characters',
+ lambda v: v < (3, 7, 8) or (3, 8, 0) <= v < (3, 8, 3)
+ ),
+ # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1288C31-L1288C50
+ (Request('http://127.0.0.1', headers={'foo\n': 'bar'}), 'Invalid header name', None),
+ ])
+ def test_httplib_validation_errors(self, handler, req, match, version_check):
+ if version_check and version_check(sys.version_info):
+ pytest.skip(f'Python {sys.version} version does not have the required validation for this test.')
+
+ with handler() as rh:
+ with pytest.raises(RequestError, match=match) as exc_info:
+ validate_and_send(rh, req)
+ assert not isinstance(exc_info.value, TransportError)
+
+
+def run_validation(handler, error, req, **handler_kwargs):
+ with handler(**handler_kwargs) as rh:
+ if error:
+ with pytest.raises(error):
+ rh.validate(req)
+ else:
+ rh.validate(req)
+
+
+class TestRequestHandlerValidation:
+
+ class ValidationRH(RequestHandler):
+ def _send(self, request):
+ raise RequestError('test')
+
+ class NoCheckRH(ValidationRH):
+ _SUPPORTED_FEATURES = None
+ _SUPPORTED_PROXY_SCHEMES = None
+ _SUPPORTED_URL_SCHEMES = None
+
+ def _check_extensions(self, extensions):
+ extensions.clear()
+
+ class HTTPSupportedRH(ValidationRH):
+ _SUPPORTED_URL_SCHEMES = ('http',)
+
+ URL_SCHEME_TESTS = [
+ # scheme, expected to fail, handler kwargs
+ ('Urllib', [
+ ('http', False, {}),
+ ('https', False, {}),
+ ('data', False, {}),
+ ('ftp', False, {}),
+ ('file', UnsupportedRequest, {}),
+ ('file', False, {'enable_file_urls': True}),
+ ]),
+ (NoCheckRH, [('http', False, {})]),
+ (ValidationRH, [('http', UnsupportedRequest, {})])
+ ]
+
+ PROXY_SCHEME_TESTS = [
+ # scheme, expected to fail
+ ('Urllib', [
+ ('http', False),
+ ('https', UnsupportedRequest),
+ ('socks4', False),
+ ('socks4a', False),
+ ('socks5', False),
+ ('socks5h', False),
+ ('socks', UnsupportedRequest),
+ ]),
+ (NoCheckRH, [('http', False)]),
+ (HTTPSupportedRH, [('http', UnsupportedRequest)]),
+ ]
+
+ PROXY_KEY_TESTS = [
+ # key, expected to fail
+ ('Urllib', [
+ ('all', False),
+ ('unrelated', False),
+ ]),
+ (NoCheckRH, [('all', False)]),
+ (HTTPSupportedRH, [('all', UnsupportedRequest)]),
+ (HTTPSupportedRH, [('no', UnsupportedRequest)]),
+ ]
+
+ EXTENSION_TESTS = [
+ ('Urllib', [
+ ({'cookiejar': 'notacookiejar'}, AssertionError),
+ ({'cookiejar': YoutubeDLCookieJar()}, False),
+ ({'cookiejar': CookieJar()}, AssertionError),
+ ({'timeout': 1}, False),
+ ({'timeout': 'notatimeout'}, AssertionError),
+ ({'unsupported': 'value'}, UnsupportedRequest),
+ ]),
+ (NoCheckRH, [
+ ({'cookiejar': 'notacookiejar'}, False),
+ ({'somerandom': 'test'}, False), # but any extension is allowed through
+ ]),
+ ]
+
+ @pytest.mark.parametrize('handler,scheme,fail,handler_kwargs', [
+ (handler_tests[0], scheme, fail, handler_kwargs)
+ for handler_tests in URL_SCHEME_TESTS
+ for scheme, fail, handler_kwargs in handler_tests[1]
+
+ ], indirect=['handler'])
+ def test_url_scheme(self, handler, scheme, fail, handler_kwargs):
+ run_validation(handler, fail, Request(f'{scheme}://'), **(handler_kwargs or {}))
+
+ @pytest.mark.parametrize('handler,fail', [('Urllib', False)], indirect=['handler'])
+ def test_no_proxy(self, handler, fail):
+ run_validation(handler, fail, Request('http://', proxies={'no': '127.0.0.1,github.com'}))
+ run_validation(handler, fail, Request('http://'), proxies={'no': '127.0.0.1,github.com'})
+
+ @pytest.mark.parametrize('handler,proxy_key,fail', [
+ (handler_tests[0], proxy_key, fail)
+ for handler_tests in PROXY_KEY_TESTS
+ for proxy_key, fail in handler_tests[1]
+ ], indirect=['handler'])
+ def test_proxy_key(self, handler, proxy_key, fail):
+ run_validation(handler, fail, Request('http://', proxies={proxy_key: 'http://example.com'}))
+ run_validation(handler, fail, Request('http://'), proxies={proxy_key: 'http://example.com'})
+
+ @pytest.mark.parametrize('handler,scheme,fail', [
+ (handler_tests[0], scheme, fail)
+ for handler_tests in PROXY_SCHEME_TESTS
+ for scheme, fail in handler_tests[1]
+ ], indirect=['handler'])
+ def test_proxy_scheme(self, handler, scheme, fail):
+ run_validation(handler, fail, Request('http://', proxies={'http': f'{scheme}://example.com'}))
+ run_validation(handler, fail, Request('http://'), proxies={'http': f'{scheme}://example.com'})
+
+ @pytest.mark.parametrize('handler', ['Urllib', HTTPSupportedRH], indirect=True)
+ def test_empty_proxy(self, handler):
+ run_validation(handler, False, Request('http://', proxies={'http': None}))
+ run_validation(handler, False, Request('http://'), proxies={'http': None})
+
+ @pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_invalid_proxy_url(self, handler, proxy_url):
+ run_validation(handler, UnsupportedRequest, Request('http://', proxies={'http': proxy_url}))
+
+ @pytest.mark.parametrize('handler,extensions,fail', [
+ (handler_tests[0], extensions, fail)
+ for handler_tests in EXTENSION_TESTS
+ for extensions, fail in handler_tests[1]
+ ], indirect=['handler'])
+ def test_extension(self, handler, extensions, fail):
+ run_validation(
+ handler, fail, Request('http://', extensions=extensions))
+
+ def test_invalid_request_type(self):
+ rh = self.ValidationRH(logger=FakeLogger())
+ for method in (rh.validate, rh.send):
+ with pytest.raises(TypeError, match='Expected an instance of Request'):
+ method('not a request')
+
+
+class FakeResponse(Response):
+ def __init__(self, request):
+ # XXX: we could make request part of standard response interface
+ self.request = request
+ super().__init__(fp=io.BytesIO(b''), headers={}, url=request.url)
+
+
+class FakeRH(RequestHandler):
+
+ def _validate(self, request):
+ return
+
+ def _send(self, request: Request):
+ if request.url.startswith('ssl://'):
+ raise SSLError(request.url[len('ssl://'):])
+ return FakeResponse(request)
+
+
+class FakeRHYDL(FakeYDL):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._request_director = self.build_request_director([FakeRH])
+
+
+class TestRequestDirector:
+
+ def test_handler_operations(self):
+ director = RequestDirector(logger=FakeLogger())
+ handler = FakeRH(logger=FakeLogger())
+ director.add_handler(handler)
+ assert director.handlers.get(FakeRH.RH_KEY) is handler
+
+ # Handler should overwrite
+ handler2 = FakeRH(logger=FakeLogger())
+ director.add_handler(handler2)
+ assert director.handlers.get(FakeRH.RH_KEY) is not handler
+ assert director.handlers.get(FakeRH.RH_KEY) is handler2
+ assert len(director.handlers) == 1
+
+ class AnotherFakeRH(FakeRH):
+ pass
+ director.add_handler(AnotherFakeRH(logger=FakeLogger()))
+ assert len(director.handlers) == 2
+ assert director.handlers.get(AnotherFakeRH.RH_KEY).RH_KEY == AnotherFakeRH.RH_KEY
+
+ director.handlers.pop(FakeRH.RH_KEY, None)
+ assert director.handlers.get(FakeRH.RH_KEY) is None
+ assert len(director.handlers) == 1
+
+ # RequestErrors should passthrough
+ with pytest.raises(SSLError):
+ director.send(Request('ssl://something'))
+
+ def test_send(self):
+ director = RequestDirector(logger=FakeLogger())
+ with pytest.raises(RequestError):
+ director.send(Request('any://'))
+ director.add_handler(FakeRH(logger=FakeLogger()))
+ assert isinstance(director.send(Request('http://')), FakeResponse)
+
+ def test_unsupported_handlers(self):
+ class SupportedRH(RequestHandler):
+ _SUPPORTED_URL_SCHEMES = ['http']
+
+ def _send(self, request: Request):
+ return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
+
+ director = RequestDirector(logger=FakeLogger())
+ director.add_handler(SupportedRH(logger=FakeLogger()))
+ director.add_handler(FakeRH(logger=FakeLogger()))
+
+ # First should take preference
+ assert director.send(Request('http://')).read() == b'supported'
+ assert director.send(Request('any://')).read() == b''
+
+ director.handlers.pop(FakeRH.RH_KEY)
+ with pytest.raises(NoSupportingHandlers):
+ director.send(Request('any://'))
+
+ def test_unexpected_error(self):
+ director = RequestDirector(logger=FakeLogger())
+
+ class UnexpectedRH(FakeRH):
+ def _send(self, request: Request):
+ raise TypeError('something')
+
+ director.add_handler(UnexpectedRH(logger=FakeLogger))
+ with pytest.raises(NoSupportingHandlers, match=r'1 unexpected error'):
+ director.send(Request('any://'))
+
+ director.handlers.clear()
+ assert len(director.handlers) == 0
+
+ # Should not be fatal
+ director.add_handler(FakeRH(logger=FakeLogger()))
+ director.add_handler(UnexpectedRH(logger=FakeLogger))
+ assert director.send(Request('any://'))
+
+ def test_preference(self):
+ director = RequestDirector(logger=FakeLogger())
+ director.add_handler(FakeRH(logger=FakeLogger()))
+
+ class SomeRH(RequestHandler):
+ _SUPPORTED_URL_SCHEMES = ['http']
+
+ def _send(self, request: Request):
+ return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
+
+ def some_preference(rh, request):
+ return (0 if not isinstance(rh, SomeRH)
+ else 100 if 'prefer' in request.headers
+ else -1)
+
+ director.add_handler(SomeRH(logger=FakeLogger()))
+ director.preferences.add(some_preference)
+
+ assert director.send(Request('http://')).read() == b''
+ assert director.send(Request('http://', headers={'prefer': '1'})).read() == b'supported'
+
+
+# XXX: do we want to move this to test_YoutubeDL.py?
+class TestYoutubeDLNetworking:
+
+ @staticmethod
+ def build_handler(ydl, handler: RequestHandler = FakeRH):
+ return ydl.build_request_director([handler]).handlers.get(handler.RH_KEY)
+
+ def test_compat_opener(self):
+ with FakeYDL() as ydl:
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', category=DeprecationWarning)
+ assert isinstance(ydl._opener, urllib.request.OpenerDirector)
+
+ @pytest.mark.parametrize('proxy,expected', [
+ ('http://127.0.0.1:8080', {'all': 'http://127.0.0.1:8080'}),
+ ('', {'all': '__noproxy__'}),
+ (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
+ ])
+ def test_proxy(self, proxy, expected):
+ old_http_proxy = os.environ.get('HTTP_PROXY')
+ try:
+ os.environ['HTTP_PROXY'] = 'http://127.0.0.1:8081' # ensure that provided proxies override env
+ with FakeYDL({'proxy': proxy}) as ydl:
+ assert ydl.proxies == expected
+ finally:
+ if old_http_proxy:
+ os.environ['HTTP_PROXY'] = old_http_proxy
+
+ def test_compat_request(self):
+ with FakeRHYDL() as ydl:
+ assert ydl.urlopen('test://')
+ urllib_req = urllib.request.Request('http://foo.bar', data=b'test', method='PUT', headers={'X-Test': '1'})
+ urllib_req.add_unredirected_header('Cookie', 'bob=bob')
+ urllib_req.timeout = 2
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', category=DeprecationWarning)
+ req = ydl.urlopen(urllib_req).request
+ assert req.url == urllib_req.get_full_url()
+ assert req.data == urllib_req.data
+ assert req.method == urllib_req.get_method()
+ assert 'X-Test' in req.headers
+ assert 'Cookie' in req.headers
+ assert req.extensions.get('timeout') == 2
+
+ with pytest.raises(AssertionError):
+ ydl.urlopen(None)
+
+ def test_extract_basic_auth(self):
+ with FakeRHYDL() as ydl:
+ res = ydl.urlopen(Request('http://user:pass@foo.bar'))
+ assert res.request.headers['Authorization'] == 'Basic dXNlcjpwYXNz'
+
+ def test_sanitize_url(self):
+ with FakeRHYDL() as ydl:
+ res = ydl.urlopen(Request('httpss://foo.bar'))
+ assert res.request.url == 'https://foo.bar'
+
+ def test_file_urls_error(self):
+ # use urllib handler
+ with FakeYDL() as ydl:
+ with pytest.raises(RequestError, match=r'file:// URLs are disabled by default'):
+ ydl.urlopen('file://')
+
+ def test_legacy_server_connect_error(self):
+ with FakeRHYDL() as ydl:
+ for error in ('UNSAFE_LEGACY_RENEGOTIATION_DISABLED', 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
+ with pytest.raises(RequestError, match=r'Try using --legacy-server-connect'):
+ ydl.urlopen(f'ssl://{error}')
+
+ with pytest.raises(SSLError, match='testerror'):
+ ydl.urlopen('ssl://testerror')
+
+ @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
+ ('http', '__noproxy__', None),
+ ('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
+ ('https', 'example.com', 'http://example.com'),
+ ('https', '//example.com', 'http://example.com'),
+ ('https', 'socks5://example.com', 'socks5h://example.com'),
+ ('http', 'socks://example.com', 'socks4://example.com'),
+ ('http', 'socks4://example.com', 'socks4://example.com'),
+ ('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
+ ])
+ def test_clean_proxy(self, proxy_key, proxy_url, expected):
+ # proxies should be cleaned in urlopen()
+ with FakeRHYDL() as ydl:
+ req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
+ assert req.proxies[proxy_key] == expected
+
+ # and should also be cleaned when building the handler
+ env_key = f'{proxy_key.upper()}_PROXY'
+ old_env_proxy = os.environ.get(env_key)
+ try:
+ os.environ[env_key] = proxy_url # ensure that provided proxies override env
+ with FakeYDL() as ydl:
+ rh = self.build_handler(ydl)
+ assert rh.proxies[proxy_key] == expected
+ finally:
+ if old_env_proxy:
+ os.environ[env_key] = old_env_proxy
+
+ def test_clean_proxy_header(self):
+ with FakeRHYDL() as ydl:
+ req = ydl.urlopen(Request('test://', headers={'ytdl-request-proxy': '//foo.bar'})).request
+ assert 'ytdl-request-proxy' not in req.headers
+ assert req.proxies == {'all': 'http://foo.bar'}
+
+ with FakeYDL({'http_headers': {'ytdl-request-proxy': '//foo.bar'}}) as ydl:
+ rh = self.build_handler(ydl)
+ assert 'ytdl-request-proxy' not in rh.headers
+ assert rh.proxies == {'all': 'http://foo.bar'}
+
+ def test_clean_header(self):
+ with FakeRHYDL() as ydl:
+ res = ydl.urlopen(Request('test://', headers={'Youtubedl-no-compression': True}))
+ assert 'Youtubedl-no-compression' not in res.request.headers
+ assert res.request.headers.get('Accept-Encoding') == 'identity'
+
+ with FakeYDL({'http_headers': {'Youtubedl-no-compression': True}}) as ydl:
+ rh = self.build_handler(ydl)
+ assert 'Youtubedl-no-compression' not in rh.headers
+ assert rh.headers.get('Accept-Encoding') == 'identity'
+
+ def test_build_handler_params(self):
+ with FakeYDL({
+ 'http_headers': {'test': 'testtest'},
+ 'socket_timeout': 2,
+ 'proxy': 'http://127.0.0.1:8080',
+ 'source_address': '127.0.0.45',
+ 'debug_printtraffic': True,
+ 'compat_opts': ['no-certifi'],
+ 'nocheckcertificate': True,
+ 'legacyserverconnect': True,
+ }) as ydl:
+ rh = self.build_handler(ydl)
+ assert rh.headers.get('test') == 'testtest'
+ assert 'Accept' in rh.headers # ensure std_headers are still there
+ assert rh.timeout == 2
+ assert rh.proxies.get('all') == 'http://127.0.0.1:8080'
+ assert rh.source_address == '127.0.0.45'
+ assert rh.verbose is True
+ assert rh.prefer_system_certs is True
+ assert rh.verify is False
+ assert rh.legacy_ssl_support is True
+
+ @pytest.mark.parametrize('ydl_params', [
+ {'client_certificate': 'fakecert.crt'},
+ {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key'},
+ {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
+ {'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
+ ])
+ def test_client_certificate(self, ydl_params):
+ with FakeYDL(ydl_params) as ydl:
+ rh = self.build_handler(ydl)
+ assert rh._client_cert == ydl_params # XXX: Too bound to implementation
+
+ def test_urllib_file_urls(self):
+ with FakeYDL({'enable_file_urls': False}) as ydl:
+ rh = self.build_handler(ydl, UrllibRH)
+ assert rh.enable_file_urls is False
+
+ with FakeYDL({'enable_file_urls': True}) as ydl:
+ rh = self.build_handler(ydl, UrllibRH)
+ assert rh.enable_file_urls is True
+
+
+class TestRequest:
+
+ def test_query(self):
+ req = Request('http://example.com?q=something', query={'v': 'xyz'})
+ assert req.url == 'http://example.com?q=something&v=xyz'
+
+ req.update(query={'v': '123'})
+ assert req.url == 'http://example.com?q=something&v=123'
+ req.update(url='http://example.com', query={'v': 'xyz'})
+ assert req.url == 'http://example.com?v=xyz'
+
+ def test_method(self):
+ req = Request('http://example.com')
+ assert req.method == 'GET'
+ req.data = b'test'
+ assert req.method == 'POST'
+ req.data = None
+ assert req.method == 'GET'
+ req.data = b'test2'
+ req.method = 'PUT'
+ assert req.method == 'PUT'
+ req.data = None
+ assert req.method == 'PUT'
+ with pytest.raises(TypeError):
+ req.method = 1
+
+ def test_request_helpers(self):
+ assert HEADRequest('http://example.com').method == 'HEAD'
+ assert PUTRequest('http://example.com').method == 'PUT'
+
+ def test_headers(self):
+ req = Request('http://example.com', headers={'tesT': 'test'})
+ assert req.headers == HTTPHeaderDict({'test': 'test'})
+ req.update(headers={'teSt2': 'test2'})
+ assert req.headers == HTTPHeaderDict({'test': 'test', 'test2': 'test2'})
+
+ req.headers = new_headers = HTTPHeaderDict({'test': 'test'})
+ assert req.headers == HTTPHeaderDict({'test': 'test'})
+ assert req.headers is new_headers
+
+ # test converts dict to case insensitive dict
+ req.headers = new_headers = {'test2': 'test2'}
+ assert isinstance(req.headers, HTTPHeaderDict)
+ assert req.headers is not new_headers
+
+ with pytest.raises(TypeError):
+ req.headers = None
+
+ def test_data_type(self):
+ req = Request('http://example.com')
+ assert req.data is None
+ # test bytes is allowed
+ req.data = b'test'
+ assert req.data == b'test'
+ # test iterable of bytes is allowed
+ i = [b'test', b'test2']
+ req.data = i
+ assert req.data == i
+
+ # test file-like object is allowed
+ f = io.BytesIO(b'test')
+ req.data = f
+ assert req.data == f
+
+ # common mistake: test str not allowed
+ with pytest.raises(TypeError):
+ req.data = 'test'
+ assert req.data != 'test'
+
+ # common mistake: test dict is not allowed
+ with pytest.raises(TypeError):
+ req.data = {'test': 'test'}
+ assert req.data != {'test': 'test'}
+
+ def test_content_length_header(self):
+ req = Request('http://example.com', headers={'Content-Length': '0'}, data=b'')
+ assert req.headers.get('Content-Length') == '0'
+
+ req.data = b'test'
+ assert 'Content-Length' not in req.headers
+
+ req = Request('http://example.com', headers={'Content-Length': '10'})
+ assert 'Content-Length' not in req.headers
+
+ def test_content_type_header(self):
+ req = Request('http://example.com', headers={'Content-Type': 'test'}, data=b'test')
+ assert req.headers.get('Content-Type') == 'test'
+ req.data = b'test2'
+ assert req.headers.get('Content-Type') == 'test'
+ req.data = None
+ assert 'Content-Type' not in req.headers
+ req.data = b'test3'
+ assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
+
+ def test_update_req(self):
+ req = Request('http://example.com')
+ assert req.data is None
+ assert req.method == 'GET'
+ assert 'Content-Type' not in req.headers
+ # Test that zero-byte payloads will be sent
+ req.update(data=b'')
+ assert req.data == b''
+ assert req.method == 'POST'
+ assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
+
+ def test_proxies(self):
+ req = Request(url='http://example.com', proxies={'http': 'http://127.0.0.1:8080'})
+ assert req.proxies == {'http': 'http://127.0.0.1:8080'}
+
+ def test_extensions(self):
+ req = Request(url='http://example.com', extensions={'timeout': 2})
+ assert req.extensions == {'timeout': 2}
+
+ def test_copy(self):
+ req = Request(
+ url='http://example.com',
+ extensions={'cookiejar': CookieJar()},
+ headers={'Accept-Encoding': 'br'},
+ proxies={'http': 'http://127.0.0.1'},
+ data=[b'123']
+ )
+ req_copy = req.copy()
+ assert req_copy is not req
+ assert req_copy.url == req.url
+ assert req_copy.headers == req.headers
+ assert req_copy.headers is not req.headers
+ assert req_copy.proxies == req.proxies
+ assert req_copy.proxies is not req.proxies
+
+ # Data is not able to be copied
+ assert req_copy.data == req.data
+ assert req_copy.data is req.data
+
+ # Shallow copy extensions
+ assert req_copy.extensions is not req.extensions
+ assert req_copy.extensions['cookiejar'] == req.extensions['cookiejar']
+
+ # Subclasses are copied by default
+ class AnotherRequest(Request):
+ pass
+
+ req = AnotherRequest(url='http://127.0.0.1')
+ assert isinstance(req.copy(), AnotherRequest)
+
+ def test_url(self):
+ req = Request(url='https://фtest.example.com/ some spaceв?ä=c',)
+ assert req.url == 'https://xn--test-z6d.example.com/%20some%20space%D0%B2?%C3%A4=c'
+
+ assert Request(url='//example.com').url == 'http://example.com'
+
+ with pytest.raises(TypeError):
+ Request(url='https://').url = None
+
+
+class TestResponse:
+
+ @pytest.mark.parametrize('reason,status,expected', [
+ ('custom', 200, 'custom'),
+ (None, 404, 'Not Found'), # fallback status
+ ('', 403, 'Forbidden'),
+ (None, 999, None)
+ ])
+ def test_reason(self, reason, status, expected):
+ res = Response(io.BytesIO(b''), url='test://', headers={}, status=status, reason=reason)
+ assert res.reason == expected
+
+ def test_headers(self):
+ headers = Message()
+ headers.add_header('Test', 'test')
+ headers.add_header('Test', 'test2')
+ headers.add_header('content-encoding', 'br')
+ res = Response(io.BytesIO(b''), headers=headers, url='test://')
+ assert res.headers.get_all('test') == ['test', 'test2']
+ assert 'Content-Encoding' in res.headers
+
+ def test_get_header(self):
+ headers = Message()
+ headers.add_header('Set-Cookie', 'cookie1')
+ headers.add_header('Set-cookie', 'cookie2')
+ headers.add_header('Test', 'test')
+ headers.add_header('Test', 'test2')
+ res = Response(io.BytesIO(b''), headers=headers, url='test://')
+ assert res.get_header('test') == 'test, test2'
+ assert res.get_header('set-Cookie') == 'cookie1'
+ assert res.get_header('notexist', 'default') == 'default'
+
+ def test_compat(self):
+ res = Response(io.BytesIO(b''), url='test://', status=404, headers={'test': 'test'})
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', category=DeprecationWarning)
+ assert res.code == res.getcode() == res.status
+ assert res.geturl() == res.url
+ assert res.info() is res.headers
+ assert res.getheader('test') == res.get_header('test')
diff --git a/test/test_networking_utils.py b/test/test_networking_utils.py
new file mode 100644
index 0000000..71cd214
--- /dev/null
+++ b/test/test_networking_utils.py
@@ -0,0 +1,282 @@
+#!/usr/bin/env python3
+
+# Allow direct execution
+import os
+import sys
+
+import pytest
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import contextlib
+import io
+import platform
+import random
+import ssl
+import urllib.error
+import warnings
+
+from hypervideo_dl.cookies import YoutubeDLCookieJar
+from hypervideo_dl.dependencies import certifi
+from hypervideo_dl.networking import Response
+from hypervideo_dl.networking._helper import (
+ InstanceStoreMixin,
+ add_accept_encoding_header,
+ get_redirect_method,
+ make_socks_proxy_opts,
+ select_proxy,
+ ssl_load_certs,
+)
+from hypervideo_dl.networking.exceptions import (
+ HTTPError,
+ IncompleteRead,
+ _CompatHTTPError,
+)
+from hypervideo_dl.socks import ProxyType
+from hypervideo_dl.utils.networking import HTTPHeaderDict
+
+TEST_DIR = os.path.dirname(os.path.abspath(__file__))
+
+
+class TestNetworkingUtils:
+
+ def test_select_proxy(self):
+ proxies = {
+ 'all': 'socks5://example.com',
+ 'http': 'http://example.com:1080',
+ 'no': 'bypass.example.com,yt-dl.org'
+ }
+
+ assert select_proxy('https://example.com', proxies) == proxies['all']
+ assert select_proxy('http://example.com', proxies) == proxies['http']
+ assert select_proxy('http://bypass.example.com', proxies) is None
+ assert select_proxy('https://yt-dl.org', proxies) is None
+
+ @pytest.mark.parametrize('socks_proxy,expected', [
+ ('socks5h://example.com', {
+ 'proxytype': ProxyType.SOCKS5,
+ 'addr': 'example.com',
+ 'port': 1080,
+ 'rdns': True,
+ 'username': None,
+ 'password': None
+ }),
+ ('socks5://user:@example.com:5555', {
+ 'proxytype': ProxyType.SOCKS5,
+ 'addr': 'example.com',
+ 'port': 5555,
+ 'rdns': False,
+ 'username': 'user',
+ 'password': ''
+ }),
+ ('socks4://u%40ser:pa%20ss@127.0.0.1:1080', {
+ 'proxytype': ProxyType.SOCKS4,
+ 'addr': '127.0.0.1',
+ 'port': 1080,
+ 'rdns': False,
+ 'username': 'u@ser',
+ 'password': 'pa ss'
+ }),
+ ('socks4a://:pa%20ss@127.0.0.1', {
+ 'proxytype': ProxyType.SOCKS4A,
+ 'addr': '127.0.0.1',
+ 'port': 1080,
+ 'rdns': True,
+ 'username': '',
+ 'password': 'pa ss'
+ })
+ ])
+ def test_make_socks_proxy_opts(self, socks_proxy, expected):
+ assert make_socks_proxy_opts(socks_proxy) == expected
+
+ def test_make_socks_proxy_unknown(self):
+ with pytest.raises(ValueError, match='Unknown SOCKS proxy version: socks'):
+ make_socks_proxy_opts('socks://127.0.0.1')
+
+ @pytest.mark.skipif(not certifi, reason='certifi is not installed')
+ def test_load_certifi(self):
+ context_certifi = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ context_certifi.load_verify_locations(cafile=certifi.where())
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ ssl_load_certs(context, use_certifi=True)
+ assert context.get_ca_certs() == context_certifi.get_ca_certs()
+
+ context_default = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ context_default.load_default_certs()
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ ssl_load_certs(context, use_certifi=False)
+ assert context.get_ca_certs() == context_default.get_ca_certs()
+
+ if context_default.get_ca_certs() == context_certifi.get_ca_certs():
+ pytest.skip('System uses certifi as default. The test is not valid')
+
+ @pytest.mark.parametrize('method,status,expected', [
+ ('GET', 303, 'GET'),
+ ('HEAD', 303, 'HEAD'),
+ ('PUT', 303, 'GET'),
+ ('POST', 301, 'GET'),
+ ('HEAD', 301, 'HEAD'),
+ ('POST', 302, 'GET'),
+ ('HEAD', 302, 'HEAD'),
+ ('PUT', 302, 'PUT'),
+ ('POST', 308, 'POST'),
+ ('POST', 307, 'POST'),
+ ('HEAD', 308, 'HEAD'),
+ ('HEAD', 307, 'HEAD'),
+ ])
+ def test_get_redirect_method(self, method, status, expected):
+ assert get_redirect_method(method, status) == expected
+
+ @pytest.mark.parametrize('headers,supported_encodings,expected', [
+ ({'Accept-Encoding': 'br'}, ['gzip', 'br'], {'Accept-Encoding': 'br'}),
+ ({}, ['gzip', 'br'], {'Accept-Encoding': 'gzip, br'}),
+ ({'Content-type': 'application/json'}, [], {'Content-type': 'application/json', 'Accept-Encoding': 'identity'}),
+ ])
+ def test_add_accept_encoding_header(self, headers, supported_encodings, expected):
+ headers = HTTPHeaderDict(headers)
+ add_accept_encoding_header(headers, supported_encodings)
+ assert headers == HTTPHeaderDict(expected)
+
+
+class TestInstanceStoreMixin:
+
+ class FakeInstanceStoreMixin(InstanceStoreMixin):
+ def _create_instance(self, **kwargs):
+ return random.randint(0, 1000000)
+
+ def _close_instance(self, instance):
+ pass
+
+ def test_mixin(self):
+ mixin = self.FakeInstanceStoreMixin()
+ assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) == mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
+
+ assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'e', 4}}) != mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
+
+ assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}} != mixin._get_instance(d={'a': 1, 'b': 2, 'g': {'d', 4}}))
+
+ assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) == mixin._get_instance(d={'a': 1}, e=[1, 2, 3])
+
+ assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) != mixin._get_instance(d={'a': 1}, e=[1, 2, 3, 4])
+
+ cookiejar = YoutubeDLCookieJar()
+ assert mixin._get_instance(b=[1, 2], c=cookiejar) == mixin._get_instance(b=[1, 2], c=cookiejar)
+
+ assert mixin._get_instance(b=[1, 2], c=cookiejar) != mixin._get_instance(b=[1, 2], c=YoutubeDLCookieJar())
+
+ # Different order
+ assert mixin._get_instance(c=cookiejar, b=[1, 2]) == mixin._get_instance(b=[1, 2], c=cookiejar)
+
+ m = mixin._get_instance(t=1234)
+ assert mixin._get_instance(t=1234) == m
+ mixin._clear_instances()
+ assert mixin._get_instance(t=1234) != m
+
+
+class TestNetworkingExceptions:
+
+ @staticmethod
+ def create_response(status):
+ return Response(fp=io.BytesIO(b'test'), url='http://example.com', headers={'tesT': 'test'}, status=status)
+
+ @pytest.mark.parametrize('http_error_class', [HTTPError, lambda r: _CompatHTTPError(HTTPError(r))])
+ def test_http_error(self, http_error_class):
+
+ response = self.create_response(403)
+ error = http_error_class(response)
+
+ assert error.status == 403
+ assert str(error) == error.msg == 'HTTP Error 403: Forbidden'
+ assert error.reason == response.reason
+ assert error.response is response
+
+ data = error.response.read()
+ assert data == b'test'
+ assert repr(error) == '<HTTPError 403: Forbidden>'
+
+ @pytest.mark.parametrize('http_error_class', [HTTPError, lambda *args, **kwargs: _CompatHTTPError(HTTPError(*args, **kwargs))])
+ def test_redirect_http_error(self, http_error_class):
+ response = self.create_response(301)
+ error = http_error_class(response, redirect_loop=True)
+ assert str(error) == error.msg == 'HTTP Error 301: Moved Permanently (redirect loop detected)'
+ assert error.reason == 'Moved Permanently'
+
+ def test_compat_http_error(self):
+ response = self.create_response(403)
+ error = _CompatHTTPError(HTTPError(response))
+ assert isinstance(error, HTTPError)
+ assert isinstance(error, urllib.error.HTTPError)
+
+ @contextlib.contextmanager
+ def raises_deprecation_warning():
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter('always')
+ yield
+
+ if len(w) == 0:
+ pytest.fail('Did not raise DeprecationWarning')
+ if len(w) > 1:
+ pytest.fail(f'Raised multiple warnings: {w}')
+
+ if not issubclass(w[-1].category, DeprecationWarning):
+ pytest.fail(f'Expected DeprecationWarning, got {w[-1].category}')
+ w.clear()
+
+ with raises_deprecation_warning():
+ assert error.code == 403
+
+ with raises_deprecation_warning():
+ assert error.getcode() == 403
+
+ with raises_deprecation_warning():
+ assert error.hdrs is error.response.headers
+
+ with raises_deprecation_warning():
+ assert error.info() is error.response.headers
+
+ with raises_deprecation_warning():
+ assert error.headers is error.response.headers
+
+ with raises_deprecation_warning():
+ assert error.filename == error.response.url
+
+ with raises_deprecation_warning():
+ assert error.url == error.response.url
+
+ with raises_deprecation_warning():
+ assert error.geturl() == error.response.url
+
+ # Passthrough file operations
+ with raises_deprecation_warning():
+ assert error.read() == b'test'
+
+ with raises_deprecation_warning():
+ assert not error.closed
+
+ with raises_deprecation_warning():
+ # Technically Response operations are also passed through, which should not be used.
+ assert error.get_header('test') == 'test'
+
+ # Should not raise a warning
+ error.close()
+
+ @pytest.mark.skipif(
+ platform.python_implementation() == 'PyPy', reason='garbage collector works differently in pypy')
+ def test_compat_http_error_autoclose(self):
+ # Compat HTTPError should not autoclose response
+ response = self.create_response(403)
+ _CompatHTTPError(HTTPError(response))
+ assert not response.closed
+
+ def test_incomplete_read_error(self):
+ error = IncompleteRead(b'test', 3, cause='test')
+ assert isinstance(error, IncompleteRead)
+ assert repr(error) == '<IncompleteRead: 4 bytes read, 3 more expected>'
+ assert str(error) == error.msg == '4 bytes read, 3 more expected'
+ assert error.partial == b'test'
+ assert error.expected == 3
+ assert error.cause == 'test'
+
+ error = IncompleteRead(b'aaa')
+ assert repr(error) == '<IncompleteRead: 3 bytes read>'
+ assert str(error) == '3 bytes read'
diff --git a/test/test_plugins.py b/test/test_plugins.py
new file mode 100644
index 0000000..38ca87c
--- /dev/null
+++ b/test/test_plugins.py
@@ -0,0 +1,73 @@
+import importlib
+import os
+import shutil
+import sys
+import unittest
+from pathlib import Path
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+TEST_DATA_DIR = Path(os.path.dirname(os.path.abspath(__file__)), 'testdata')
+sys.path.append(str(TEST_DATA_DIR))
+importlib.invalidate_caches()
+
+from hypervideo_dl.plugins import PACKAGE_NAME, directories, load_plugins
+
+
+class TestPlugins(unittest.TestCase):
+
+ TEST_PLUGIN_DIR = TEST_DATA_DIR / PACKAGE_NAME
+
+ def test_directories_containing_plugins(self):
+ self.assertIn(self.TEST_PLUGIN_DIR, map(Path, directories()))
+
+ def test_extractor_classes(self):
+ for module_name in tuple(sys.modules):
+ if module_name.startswith(f'{PACKAGE_NAME}.extractor'):
+ del sys.modules[module_name]
+ plugins_ie = load_plugins('extractor', 'IE')
+
+ self.assertIn(f'{PACKAGE_NAME}.extractor.normal', sys.modules.keys())
+ self.assertIn('NormalPluginIE', plugins_ie.keys())
+
+ # don't load modules with underscore prefix
+ self.assertFalse(
+ f'{PACKAGE_NAME}.extractor._ignore' in sys.modules.keys(),
+ 'loaded module beginning with underscore')
+ self.assertNotIn('IgnorePluginIE', plugins_ie.keys())
+
+ # Don't load extractors with underscore prefix
+ self.assertNotIn('_IgnoreUnderscorePluginIE', plugins_ie.keys())
+
+ # Don't load extractors not specified in __all__ (if supplied)
+ self.assertNotIn('IgnoreNotInAllPluginIE', plugins_ie.keys())
+ self.assertIn('InAllPluginIE', plugins_ie.keys())
+
+ def test_postprocessor_classes(self):
+ plugins_pp = load_plugins('postprocessor', 'PP')
+ self.assertIn('NormalPluginPP', plugins_pp.keys())
+
+ def test_importing_zipped_module(self):
+ zip_path = TEST_DATA_DIR / 'zipped_plugins.zip'
+ shutil.make_archive(str(zip_path)[:-4], 'zip', str(zip_path)[:-4])
+ sys.path.append(str(zip_path)) # add zip to search paths
+ importlib.invalidate_caches() # reset the import caches
+
+ try:
+ for plugin_type in ('extractor', 'postprocessor'):
+ package = importlib.import_module(f'{PACKAGE_NAME}.{plugin_type}')
+ self.assertIn(zip_path / PACKAGE_NAME / plugin_type, map(Path, package.__path__))
+
+ plugins_ie = load_plugins('extractor', 'IE')
+ self.assertIn('ZippedPluginIE', plugins_ie.keys())
+
+ plugins_pp = load_plugins('postprocessor', 'PP')
+ self.assertIn('ZippedPluginPP', plugins_pp.keys())
+
+ finally:
+ sys.path.remove(str(zip_path))
+ os.remove(zip_path)
+ importlib.invalidate_caches() # reset the import caches
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_socks.py b/test/test_socks.py
index 6651290..73047ec 100644
--- a/test/test_socks.py
+++ b/test/test_socks.py
@@ -1,113 +1,470 @@
#!/usr/bin/env python3
-
# Allow direct execution
import os
import sys
+import threading
import unittest
-sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+import pytest
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+import abc
+import contextlib
+import enum
+import functools
+import http.server
+import json
import random
-import subprocess
-import urllib.request
+import socket
+import struct
+import time
+from socketserver import (
+ BaseRequestHandler,
+ StreamRequestHandler,
+ ThreadingTCPServer,
+)
-from test.helper import FakeYDL, get_params, is_download_test
+from test.helper import http_server_port
+from hypervideo_dl.networking import Request
+from hypervideo_dl.networking.exceptions import ProxyError, TransportError
+from hypervideo_dl.socks import (
+ SOCKS4_REPLY_VERSION,
+ SOCKS4_VERSION,
+ SOCKS5_USER_AUTH_SUCCESS,
+ SOCKS5_USER_AUTH_VERSION,
+ SOCKS5_VERSION,
+ Socks5AddressType,
+ Socks5Auth,
+)
+SOCKS5_USER_AUTH_FAILURE = 0x1
-@is_download_test
-class TestMultipleSocks(unittest.TestCase):
- @staticmethod
- def _check_params(attrs):
- params = get_params()
- for attr in attrs:
- if attr not in params:
- print('Missing %s. Skipping.' % attr)
- return
- return params
- def test_proxy_http(self):
- params = self._check_params(['primary_proxy', 'primary_server_ip'])
- if params is None:
+class Socks4CD(enum.IntEnum):
+ REQUEST_GRANTED = 90
+ REQUEST_REJECTED_OR_FAILED = 91
+ REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
+ REQUEST_REJECTED_DIFFERENT_USERID = 93
+
+
+class Socks5Reply(enum.IntEnum):
+ SUCCEEDED = 0x0
+ GENERAL_FAILURE = 0x1
+ CONNECTION_NOT_ALLOWED = 0x2
+ NETWORK_UNREACHABLE = 0x3
+ HOST_UNREACHABLE = 0x4
+ CONNECTION_REFUSED = 0x5
+ TTL_EXPIRED = 0x6
+ COMMAND_NOT_SUPPORTED = 0x7
+ ADDRESS_TYPE_NOT_SUPPORTED = 0x8
+
+
+class SocksTestRequestHandler(BaseRequestHandler):
+
+ def __init__(self, *args, socks_info=None, **kwargs):
+ self.socks_info = socks_info
+ super().__init__(*args, **kwargs)
+
+
+class SocksProxyHandler(BaseRequestHandler):
+ def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
+ self.socks_kwargs = socks_server_kwargs or {}
+ self.request_handler_class = request_handler_class
+ super().__init__(*args, **kwargs)
+
+
+class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
+
+ # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
+ # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
+
+ def handle(self):
+ sleep = self.socks_kwargs.get('sleep')
+ if sleep:
+ time.sleep(sleep)
+ version, nmethods = self.connection.recv(2)
+ assert version == SOCKS5_VERSION
+ methods = list(self.connection.recv(nmethods))
+
+ auth = self.socks_kwargs.get('auth')
+
+ if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
+ self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
+ self.server.close_request(self.request)
return
- ydl = FakeYDL({
- 'proxy': params['primary_proxy']
- })
- self.assertEqual(
- ydl.urlopen('http://yt-dl.org/ip').read().decode(),
- params['primary_server_ip'])
-
- def test_proxy_https(self):
- params = self._check_params(['primary_proxy', 'primary_server_ip'])
- if params is None:
+
+ elif Socks5Auth.AUTH_USER_PASS in methods:
+ self.connection.sendall(struct.pack("!BB", SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
+
+ _, user_len = struct.unpack('!BB', self.connection.recv(2))
+ username = self.connection.recv(user_len).decode()
+ pass_len = ord(self.connection.recv(1))
+ password = self.connection.recv(pass_len).decode()
+
+ if username == auth[0] and password == auth[1]:
+ self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
+ else:
+ self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
+ self.server.close_request(self.request)
+ return
+
+ elif Socks5Auth.AUTH_NONE in methods:
+ self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
+ else:
+ self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
+ self.server.close_request(self.request)
return
- ydl = FakeYDL({
- 'proxy': params['primary_proxy']
- })
- self.assertEqual(
- ydl.urlopen('https://yt-dl.org/ip').read().decode(),
- params['primary_server_ip'])
-
- def test_secondary_proxy_http(self):
- params = self._check_params(['secondary_proxy', 'secondary_server_ip'])
- if params is None:
+
+ version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
+ socks_info = {
+ 'version': version,
+ 'auth_methods': methods,
+ 'command': command,
+ 'client_address': self.client_address,
+ 'ipv4_address': None,
+ 'domain_address': None,
+ 'ipv6_address': None,
+ }
+ if address_type == Socks5AddressType.ATYP_IPV4:
+ socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
+ elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
+ socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
+ elif address_type == Socks5AddressType.ATYP_IPV6:
+ socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
+ else:
+ self.server.close_request(self.request)
+
+ socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
+
+ # dummy response, the returned IP is just a placeholder
+ self.connection.sendall(struct.pack(
+ '!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
+
+ self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
+
+
+class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
+
+ # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
+ # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
+
+ def _read_until_null(self):
+ return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
+
+ def handle(self):
+ sleep = self.socks_kwargs.get('sleep')
+ if sleep:
+ time.sleep(sleep)
+ socks_info = {
+ 'version': SOCKS4_VERSION,
+ 'command': None,
+ 'client_address': self.client_address,
+ 'ipv4_address': None,
+ 'port': None,
+ 'domain_address': None,
+ }
+ version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
+ socks_info['port'] = dest_port
+ socks_info['command'] = command
+ if version != SOCKS4_VERSION:
+ self.server.close_request(self.request)
return
- ydl = FakeYDL()
- req = urllib.request.Request('http://yt-dl.org/ip')
- req.add_header('Ytdl-request-proxy', params['secondary_proxy'])
- self.assertEqual(
- ydl.urlopen(req).read().decode(),
- params['secondary_server_ip'])
-
- def test_secondary_proxy_https(self):
- params = self._check_params(['secondary_proxy', 'secondary_server_ip'])
- if params is None:
+ use_remote_dns = False
+ if 0x0 < dest_ip <= 0xFF:
+ use_remote_dns = True
+ else:
+ socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack("!I", dest_ip))
+
+ user_id = self._read_until_null().decode()
+ if user_id != (self.socks_kwargs.get('user_id') or ''):
+ self.connection.sendall(struct.pack(
+ '!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
+ self.server.close_request(self.request)
return
- ydl = FakeYDL()
- req = urllib.request.Request('https://yt-dl.org/ip')
- req.add_header('Ytdl-request-proxy', params['secondary_proxy'])
- self.assertEqual(
- ydl.urlopen(req).read().decode(),
- params['secondary_server_ip'])
+ if use_remote_dns:
+ socks_info['domain_address'] = self._read_until_null().decode()
-@is_download_test
-class TestSocks(unittest.TestCase):
- _SKIP_SOCKS_TEST = True
+ # dummy response, the returned IP is just a placeholder
+ self.connection.sendall(
+ struct.pack(
+ '!BBHI', SOCKS4_REPLY_VERSION,
+ self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
- def setUp(self):
- if self._SKIP_SOCKS_TEST:
- return
+ self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
- self.port = random.randint(20000, 30000)
- self.server_process = subprocess.Popen([
- 'srelay', '-f', '-i', '127.0.0.1:%d' % self.port],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- def tearDown(self):
- if self._SKIP_SOCKS_TEST:
- return
+class IPv6ThreadingTCPServer(ThreadingTCPServer):
+ address_family = socket.AF_INET6
+
+
+class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
+ def do_GET(self):
+ if self.path == '/socks_info':
+ payload = json.dumps(self.socks_info.copy())
+ self.send_response(200)
+ self.send_header('Content-Type', 'application/json; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload.encode())
+
+
+@contextlib.contextmanager
+def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
+ server = server_thread = None
+ try:
+ bind_address = bind_ip or '127.0.0.1'
+ server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
+ server = server_type(
+ (bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
+ server_port = http_server_port(server)
+ server_thread = threading.Thread(target=server.serve_forever)
+ server_thread.daemon = True
+ server_thread.start()
+ if '.' not in bind_address:
+ yield f'[{bind_address}]:{server_port}'
+ else:
+ yield f'{bind_address}:{server_port}'
+ finally:
+ server.shutdown()
+ server.server_close()
+ server_thread.join(2.0)
+
+
+class SocksProxyTestContext(abc.ABC):
+ REQUEST_HANDLER_CLASS = None
+
+ def socks_server(self, server_class, *args, **kwargs):
+ return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
+
+ @abc.abstractmethod
+ def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
+ """return a dict of socks_info"""
+
+
+class HTTPSocksTestProxyContext(SocksProxyTestContext):
+ REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
+
+ def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
+ request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
+ handler.validate(request)
+ return json.loads(handler.send(request).read().decode())
+
+
+CTX_MAP = {
+ 'http': HTTPSocksTestProxyContext,
+}
+
+
+@pytest.fixture(scope='module')
+def ctx(request):
+ return CTX_MAP[request.param]()
+
+
+class TestSocks4Proxy:
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks4_no_auth(self, handler, ctx):
+ with handler() as rh:
+ with ctx.socks_server(Socks4ProxyHandler) as server_address:
+ response = ctx.socks_info_request(
+ rh, proxies={'all': f'socks4://{server_address}'})
+ assert response['version'] == 4
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks4_auth(self, handler, ctx):
+ with handler() as rh:
+ with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
+ with pytest.raises(ProxyError):
+ ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
+ response = ctx.socks_info_request(
+ rh, proxies={'all': f'socks4://user:@{server_address}'})
+ assert response['version'] == 4
+
+ @pytest.mark.parametrize('handler,ctx', [
+ pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
+ reason='socks4a implementation currently broken when destination is not a domain name'))
+ ], indirect=True)
+ def test_socks4a_ipv4_target(self, handler, ctx):
+ with ctx.socks_server(Socks4ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
+ assert response['version'] == 4
+ assert response['ipv4_address'] == '127.0.0.1'
+ assert response['domain_address'] is None
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks4a_domain_target(self, handler, ctx):
+ with ctx.socks_server(Socks4ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='localhost')
+ assert response['version'] == 4
+ assert response['ipv4_address'] is None
+ assert response['domain_address'] == 'localhost'
+
+ @pytest.mark.parametrize('handler,ctx', [
+ pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
+ reason='source_address is not yet supported for socks4 proxies'))
+ ], indirect=True)
+ def test_ipv4_client_source_address(self, handler, ctx):
+ with ctx.socks_server(Socks4ProxyHandler) as server_address:
+ source_address = f'127.0.0.{random.randint(5, 255)}'
+ with handler(proxies={'all': f'socks4://{server_address}'},
+ source_address=source_address) as rh:
+ response = ctx.socks_info_request(rh)
+ assert response['client_address'][0] == source_address
+ assert response['version'] == 4
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ @pytest.mark.parametrize('reply_code', [
+ Socks4CD.REQUEST_REJECTED_OR_FAILED,
+ Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
+ Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
+ ])
+ def test_socks4_errors(self, handler, ctx, reply_code):
+ with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
+ with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
+ with pytest.raises(ProxyError):
+ ctx.socks_info_request(rh)
+
+ @pytest.mark.parametrize('handler,ctx', [
+ pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
+ reason='IPv6 socks4 proxies are not yet supported'))
+ ], indirect=True)
+ def test_ipv6_socks4_proxy(self, handler, ctx):
+ with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
+ with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
+ assert response['client_address'][0] == '::1'
+ assert response['ipv4_address'] == '127.0.0.1'
+ assert response['version'] == 4
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_timeout(self, handler, ctx):
+ with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
+ with handler(proxies={'all': f'socks4://{server_address}'}, timeout=1) as rh:
+ with pytest.raises(TransportError):
+ ctx.socks_info_request(rh)
+
+
+class TestSocks5Proxy:
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks5_no_auth(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh)
+ assert response['auth_methods'] == [0x0]
+ assert response['version'] == 5
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks5_user_pass(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
+ with handler() as rh:
+ with pytest.raises(ProxyError):
+ ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
+
+ response = ctx.socks_info_request(
+ rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
+
+ assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
+ assert response['version'] == 5
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks5_ipv4_target(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
+ assert response['ipv4_address'] == '127.0.0.1'
+ assert response['version'] == 5
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks5_domain_target(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='localhost')
+ assert response['ipv4_address'] == '127.0.0.1'
+ assert response['version'] == 5
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks5h_domain_target(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='localhost')
+ assert response['ipv4_address'] is None
+ assert response['domain_address'] == 'localhost'
+ assert response['version'] == 5
- self.server_process.terminate()
- self.server_process.communicate()
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks5h_ip_target(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
+ assert response['ipv4_address'] == '127.0.0.1'
+ assert response['domain_address'] is None
+ assert response['version'] == 5
- def _get_ip(self, protocol):
- if self._SKIP_SOCKS_TEST:
- return '127.0.0.1'
+ @pytest.mark.parametrize('handler,ctx', [
+ pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
+ reason='IPv6 destination addresses are not yet supported'))
+ ], indirect=True)
+ def test_socks5_ipv6_destination(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='[::1]')
+ assert response['ipv6_address'] == '::1'
+ assert response['port'] == 80
+ assert response['version'] == 5
- ydl = FakeYDL({
- 'proxy': '%s://127.0.0.1:%d' % (protocol, self.port),
- })
- return ydl.urlopen('http://yt-dl.org/ip').read().decode()
+ @pytest.mark.parametrize('handler,ctx', [
+ pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
+ reason='IPv6 socks5 proxies are not yet supported'))
+ ], indirect=True)
+ def test_ipv6_socks5_proxy(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
+ assert response['client_address'][0] == '::1'
+ assert response['ipv4_address'] == '127.0.0.1'
+ assert response['version'] == 5
- def test_socks4(self):
- self.assertTrue(isinstance(self._get_ip('socks4'), str))
+ # XXX: is there any feasible way of testing IPv6 source addresses?
+ # Same would go for non-proxy source_address test...
+ @pytest.mark.parametrize('handler,ctx', [
+ pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
+ reason='source_address is not yet supported for socks5 proxies'))
+ ], indirect=True)
+ def test_ipv4_client_source_address(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ source_address = f'127.0.0.{random.randint(5, 255)}'
+ with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
+ response = ctx.socks_info_request(rh)
+ assert response['client_address'][0] == source_address
+ assert response['version'] == 5
- def test_socks4a(self):
- self.assertTrue(isinstance(self._get_ip('socks4a'), str))
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ @pytest.mark.parametrize('reply_code', [
+ Socks5Reply.GENERAL_FAILURE,
+ Socks5Reply.CONNECTION_NOT_ALLOWED,
+ Socks5Reply.NETWORK_UNREACHABLE,
+ Socks5Reply.HOST_UNREACHABLE,
+ Socks5Reply.CONNECTION_REFUSED,
+ Socks5Reply.TTL_EXPIRED,
+ Socks5Reply.COMMAND_NOT_SUPPORTED,
+ Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
+ ])
+ def test_socks5_errors(self, handler, ctx, reply_code):
+ with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
+ with pytest.raises(ProxyError):
+ ctx.socks_info_request(rh)
- def test_socks5(self):
- self.assertTrue(isinstance(self._get_ip('socks5'), str))
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_timeout(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
+ with pytest.raises(TransportError):
+ ctx.socks_info_request(rh)
if __name__ == '__main__':
diff --git a/test/test_utils.py b/test/test_utils.py
index acb913a..c089582 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -5,6 +5,7 @@ import os
import re
import sys
import unittest
+import warnings
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -46,10 +47,9 @@ from hypervideo_dl.utils import (
encode_base_n,
encode_compat_str,
encodeFilename,
- escape_rfc3986,
- escape_url,
expand_path,
extract_attributes,
+ extract_basic_auth,
find_xpath_attr,
fix_xml_ampersands,
float_or_none,
@@ -102,15 +102,16 @@ from hypervideo_dl.utils import (
sanitize_filename,
sanitize_path,
sanitize_url,
- sanitized_Request,
shell_quote,
smuggle_url,
+ str_or_none,
str_to_int,
strip_jsonp,
strip_or_none,
subtitles_filename,
timeconvert,
traverse_obj,
+ try_call,
unescapeHTML,
unified_strdate,
unified_timestamp,
@@ -122,12 +123,19 @@ from hypervideo_dl.utils import (
urlencode_postdata,
urljoin,
urshift,
+ variadic,
version_tuple,
xpath_attr,
xpath_element,
xpath_text,
xpath_with_ns,
)
+from hypervideo_dl.utils.networking import (
+ HTTPHeaderDict,
+ escape_rfc3986,
+ normalize_url,
+ remove_dot_segments,
+)
class TestUtil(unittest.TestCase):
@@ -254,15 +262,6 @@ class TestUtil(unittest.TestCase):
self.assertEqual(sanitize_url('https://foo.bar'), 'https://foo.bar')
self.assertEqual(sanitize_url('foo bar'), 'foo bar')
- def test_extract_basic_auth(self):
- auth_header = lambda url: sanitized_Request(url).get_header('Authorization')
- self.assertFalse(auth_header('http://foo.bar'))
- self.assertFalse(auth_header('http://:foo.bar'))
- self.assertEqual(auth_header('http://@foo.bar'), 'Basic Og==')
- self.assertEqual(auth_header('http://:pass@foo.bar'), 'Basic OnBhc3M=')
- self.assertEqual(auth_header('http://user:@foo.bar'), 'Basic dXNlcjo=')
- self.assertEqual(auth_header('http://user:pass@foo.bar'), 'Basic dXNlcjpwYXNz')
-
def test_expand_path(self):
def env(var):
return f'%{var}%' if sys.platform == 'win32' else f'${var}'
@@ -659,6 +658,8 @@ class TestUtil(unittest.TestCase):
self.assertEqual(parse_duration('P0Y0M0DT0H4M20.880S'), 260.88)
self.assertEqual(parse_duration('01:02:03:050'), 3723.05)
self.assertEqual(parse_duration('103:050'), 103.05)
+ self.assertEqual(parse_duration('1HR 3MIN'), 3780)
+ self.assertEqual(parse_duration('2hrs 3mins'), 7380)
def test_fix_xml_ampersands(self):
self.assertEqual(
@@ -935,24 +936,124 @@ class TestUtil(unittest.TestCase):
self.assertEqual(escape_rfc3986('foo bar'), 'foo%20bar')
self.assertEqual(escape_rfc3986('foo%20bar'), 'foo%20bar')
- def test_escape_url(self):
+ def test_normalize_url(self):
self.assertEqual(
- escape_url('http://wowza.imust.org/srv/vod/telemb/new/UPLOAD/UPLOAD/20224_IncendieHavré_FD.mp4'),
+ normalize_url('http://wowza.imust.org/srv/vod/telemb/new/UPLOAD/UPLOAD/20224_IncendieHavré_FD.mp4'),
'http://wowza.imust.org/srv/vod/telemb/new/UPLOAD/UPLOAD/20224_IncendieHavre%CC%81_FD.mp4'
)
self.assertEqual(
- escape_url('http://www.ardmediathek.de/tv/Sturm-der-Liebe/Folge-2036-Zu-Mann-und-Frau-erklärt/Das-Erste/Video?documentId=22673108&bcastId=5290'),
+ normalize_url('http://www.ardmediathek.de/tv/Sturm-der-Liebe/Folge-2036-Zu-Mann-und-Frau-erklärt/Das-Erste/Video?documentId=22673108&bcastId=5290'),
'http://www.ardmediathek.de/tv/Sturm-der-Liebe/Folge-2036-Zu-Mann-und-Frau-erkl%C3%A4rt/Das-Erste/Video?documentId=22673108&bcastId=5290'
)
self.assertEqual(
- escape_url('http://тест.рф/фрагмент'),
+ normalize_url('http://тест.рф/фрагмент'),
'http://xn--e1aybc.xn--p1ai/%D1%84%D1%80%D0%B0%D0%B3%D0%BC%D0%B5%D0%BD%D1%82'
)
self.assertEqual(
- escape_url('http://тест.рф/абв?абв=абв#абв'),
+ normalize_url('http://тест.рф/абв?абв=абв#абв'),
'http://xn--e1aybc.xn--p1ai/%D0%B0%D0%B1%D0%B2?%D0%B0%D0%B1%D0%B2=%D0%B0%D0%B1%D0%B2#%D0%B0%D0%B1%D0%B2'
)
- self.assertEqual(escape_url('http://vimeo.com/56015672#at=0'), 'http://vimeo.com/56015672#at=0')
+ self.assertEqual(normalize_url('http://vimeo.com/56015672#at=0'), 'http://vimeo.com/56015672#at=0')
+
+ self.assertEqual(normalize_url('http://www.example.com/../a/b/../c/./d.html'), 'http://www.example.com/a/c/d.html')
+
+ def test_remove_dot_segments(self):
+ self.assertEqual(remove_dot_segments('/a/b/c/./../../g'), '/a/g')
+ self.assertEqual(remove_dot_segments('mid/content=5/../6'), 'mid/6')
+ self.assertEqual(remove_dot_segments('/ad/../cd'), '/cd')
+ self.assertEqual(remove_dot_segments('/ad/../cd/'), '/cd/')
+ self.assertEqual(remove_dot_segments('/..'), '/')
+ self.assertEqual(remove_dot_segments('/./'), '/')
+ self.assertEqual(remove_dot_segments('/./a'), '/a')
+ self.assertEqual(remove_dot_segments('/abc/./.././d/././e/.././f/./../../ghi'), '/ghi')
+ self.assertEqual(remove_dot_segments('/'), '/')
+ self.assertEqual(remove_dot_segments('/t'), '/t')
+ self.assertEqual(remove_dot_segments('t'), 't')
+ self.assertEqual(remove_dot_segments(''), '')
+ self.assertEqual(remove_dot_segments('/../a/b/c'), '/a/b/c')
+ self.assertEqual(remove_dot_segments('../a'), 'a')
+ self.assertEqual(remove_dot_segments('./a'), 'a')
+ self.assertEqual(remove_dot_segments('.'), '')
+ self.assertEqual(remove_dot_segments('////'), '////')
+
+ def test_js_to_json_vars_strings(self):
+ self.assertDictEqual(
+ json.loads(js_to_json(
+ '''{
+ 'null': a,
+ 'nullStr': b,
+ 'true': c,
+ 'trueStr': d,
+ 'false': e,
+ 'falseStr': f,
+ 'unresolvedVar': g,
+ }''',
+ {
+ 'a': 'null',
+ 'b': '"null"',
+ 'c': 'true',
+ 'd': '"true"',
+ 'e': 'false',
+ 'f': '"false"',
+ 'g': 'var',
+ }
+ )),
+ {
+ 'null': None,
+ 'nullStr': 'null',
+ 'true': True,
+ 'trueStr': 'true',
+ 'false': False,
+ 'falseStr': 'false',
+ 'unresolvedVar': 'var'
+ }
+ )
+
+ self.assertDictEqual(
+ json.loads(js_to_json(
+ '''{
+ 'int': a,
+ 'intStr': b,
+ 'float': c,
+ 'floatStr': d,
+ }''',
+ {
+ 'a': '123',
+ 'b': '"123"',
+ 'c': '1.23',
+ 'd': '"1.23"',
+ }
+ )),
+ {
+ 'int': 123,
+ 'intStr': '123',
+ 'float': 1.23,
+ 'floatStr': '1.23',
+ }
+ )
+
+ self.assertDictEqual(
+ json.loads(js_to_json(
+ '''{
+ 'object': a,
+ 'objectStr': b,
+ 'array': c,
+ 'arrayStr': d,
+ }''',
+ {
+ 'a': '{}',
+ 'b': '"{}"',
+ 'c': '[]',
+ 'd': '"[]"',
+ }
+ )),
+ {
+ 'object': {},
+ 'objectStr': '{}',
+ 'array': [],
+ 'arrayStr': '[]',
+ }
+ )
def test_js_to_json_realworld(self):
inp = '''{
@@ -1110,6 +1211,13 @@ class TestUtil(unittest.TestCase):
self.assertEqual(js_to_json('42a1'), '42"a1"')
self.assertEqual(js_to_json('42a-1'), '42"a"-1')
+ def test_js_to_json_template_literal(self):
+ self.assertEqual(js_to_json('`Hello ${name}`', {'name': '"world"'}), '"Hello world"')
+ self.assertEqual(js_to_json('`${name}${name}`', {'name': '"X"'}), '"XX"')
+ self.assertEqual(js_to_json('`${name}${name}`', {'name': '5'}), '"55"')
+ self.assertEqual(js_to_json('`${name}"${name}"`', {'name': '5'}), '"5\\"5\\""')
+ self.assertEqual(js_to_json('`${name}`', {}), '"name"')
+
def test_extract_attributes(self):
self.assertEqual(extract_attributes('<e x="y">'), {'x': 'y'})
self.assertEqual(extract_attributes("<e x='y'>"), {'x': 'y'})
@@ -1745,6 +1853,8 @@ Line 1
def test_clean_podcast_url(self):
self.assertEqual(clean_podcast_url('https://www.podtrac.com/pts/redirect.mp3/chtbl.com/track/5899E/traffic.megaphone.fm/HSW7835899191.mp3'), 'https://traffic.megaphone.fm/HSW7835899191.mp3')
self.assertEqual(clean_podcast_url('https://play.podtrac.com/npr-344098539/edge1.pod.npr.org/anon.npr-podcasts/podcast/npr/waitwait/2020/10/20201003_waitwait_wwdtmpodcast201003-015621a5-f035-4eca-a9a1-7c118d90bc3c.mp3'), 'https://edge1.pod.npr.org/anon.npr-podcasts/podcast/npr/waitwait/2020/10/20201003_waitwait_wwdtmpodcast201003-015621a5-f035-4eca-a9a1-7c118d90bc3c.mp3')
+ self.assertEqual(clean_podcast_url('https://pdst.fm/e/2.gum.fm/chtbl.com/track/chrt.fm/track/34D33/pscrb.fm/rss/p/traffic.megaphone.fm/ITLLC7765286967.mp3?updated=1687282661'), 'https://traffic.megaphone.fm/ITLLC7765286967.mp3?updated=1687282661')
+ self.assertEqual(clean_podcast_url('https://pdst.fm/e/https://mgln.ai/e/441/www.buzzsprout.com/1121972/13019085-ep-252-the-deep-life-stack.mp3'), 'https://www.buzzsprout.com/1121972/13019085-ep-252-the-deep-life-stack.mp3')
def test_LazyList(self):
it = list(range(10))
@@ -1874,6 +1984,8 @@ Line 1
vcodecs=[None], acodecs=[None], vexts=['webm'], aexts=['m4a']), 'mkv')
self.assertEqual(get_compatible_ext(
vcodecs=[None], acodecs=[None], vexts=['webm'], aexts=['webm']), 'webm')
+ self.assertEqual(get_compatible_ext(
+ vcodecs=[None], acodecs=[None], vexts=['webm'], aexts=['weba']), 'webm')
self.assertEqual(get_compatible_ext(
vcodecs=['h264'], acodecs=['mp4a'], vexts=['mov'], aexts=['m4a']), 'mp4')
@@ -1885,6 +1997,35 @@ Line 1
self.assertEqual(get_compatible_ext(
vcodecs=['av1'], acodecs=['mp4a'], vexts=['webm'], aexts=['m4a'], preferences=('webm', 'mkv')), 'mkv')
+ def test_try_call(self):
+ def total(*x, **kwargs):
+ return sum(x) + sum(kwargs.values())
+
+ self.assertEqual(try_call(None), None,
+ msg='not a fn should give None')
+ self.assertEqual(try_call(lambda: 1), 1,
+ msg='int fn with no expected_type should give int')
+ self.assertEqual(try_call(lambda: 1, expected_type=int), 1,
+ msg='int fn with expected_type int should give int')
+ self.assertEqual(try_call(lambda: 1, expected_type=dict), None,
+ msg='int fn with wrong expected_type should give None')
+ self.assertEqual(try_call(total, args=(0, 1, 0, ), expected_type=int), 1,
+ msg='fn should accept arglist')
+ self.assertEqual(try_call(total, kwargs={'a': 0, 'b': 1, 'c': 0}, expected_type=int), 1,
+ msg='fn should accept kwargs')
+ self.assertEqual(try_call(lambda: 1, expected_type=dict), None,
+ msg='int fn with no expected_type should give None')
+ self.assertEqual(try_call(lambda x: {}, total, args=(42, ), expected_type=int), 42,
+ msg='expect first int result with expected_type int')
+
+ def test_variadic(self):
+ self.assertEqual(variadic(None), (None, ))
+ self.assertEqual(variadic('spam'), ('spam', ))
+ self.assertEqual(variadic('spam', allowed_types=dict), 'spam')
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore')
+ self.assertEqual(variadic('spam', allowed_types=[dict]), 'spam')
+
def test_traverse_obj(self):
_TEST_DATA = {
100: 100,
@@ -1918,8 +2059,8 @@ Line 1
# Test Ellipsis behavior
self.assertCountEqual(traverse_obj(_TEST_DATA, ...),
- (item for item in _TEST_DATA.values() if item is not None),
- msg='`...` should give all values except `None`')
+ (item for item in _TEST_DATA.values() if item not in (None, {})),
+ msg='`...` should give all non discarded values')
self.assertCountEqual(traverse_obj(_TEST_DATA, ('urls', 0, ...)), _TEST_DATA['urls'][0].values(),
msg='`...` selection for dicts should select all values')
self.assertEqual(traverse_obj(_TEST_DATA, (..., ..., 'url')),
@@ -1927,6 +2068,8 @@ Line 1
msg='nested `...` queries should work')
self.assertCountEqual(traverse_obj(_TEST_DATA, (..., ..., 'index')), range(4),
msg='`...` query result should be flattened')
+ self.assertEqual(traverse_obj(iter(range(4)), ...), list(range(4)),
+ msg='`...` should accept iterables')
# Test function as key
self.assertEqual(traverse_obj(_TEST_DATA, lambda x, y: x == 'urls' and isinstance(y, list)),
@@ -1934,6 +2077,42 @@ Line 1
msg='function as query key should perform a filter based on (key, value)')
self.assertCountEqual(traverse_obj(_TEST_DATA, lambda _, x: isinstance(x[0], str)), {'str'},
msg='exceptions in the query function should be catched')
+ self.assertEqual(traverse_obj(iter(range(4)), lambda _, x: x % 2 == 0), [0, 2],
+ msg='function key should accept iterables')
+ if __debug__:
+ with self.assertRaises(Exception, msg='Wrong function signature should raise in debug'):
+ traverse_obj(_TEST_DATA, lambda a: ...)
+ with self.assertRaises(Exception, msg='Wrong function signature should raise in debug'):
+ traverse_obj(_TEST_DATA, lambda a, b, c: ...)
+
+ # Test set as key (transformation/type, like `expected_type`)
+ self.assertEqual(traverse_obj(_TEST_DATA, (..., {str.upper}, )), ['STR'],
+ msg='Function in set should be a transformation')
+ self.assertEqual(traverse_obj(_TEST_DATA, (..., {str})), ['str'],
+ msg='Type in set should be a type filter')
+ self.assertEqual(traverse_obj(_TEST_DATA, {dict}), _TEST_DATA,
+ msg='A single set should be wrapped into a path')
+ self.assertEqual(traverse_obj(_TEST_DATA, (..., {str.upper})), ['STR'],
+ msg='Transformation function should not raise')
+ self.assertEqual(traverse_obj(_TEST_DATA, (..., {str_or_none})),
+ [item for item in map(str_or_none, _TEST_DATA.values()) if item is not None],
+ msg='Function in set should be a transformation')
+ if __debug__:
+ with self.assertRaises(Exception, msg='Sets with length != 1 should raise in debug'):
+ traverse_obj(_TEST_DATA, set())
+ with self.assertRaises(Exception, msg='Sets with length != 1 should raise in debug'):
+ traverse_obj(_TEST_DATA, {str.upper, str})
+
+ # Test `slice` as a key
+ _SLICE_DATA = [0, 1, 2, 3, 4]
+ self.assertEqual(traverse_obj(_TEST_DATA, ('dict', slice(1))), None,
+ msg='slice on a dictionary should not throw')
+ self.assertEqual(traverse_obj(_SLICE_DATA, slice(1)), _SLICE_DATA[:1],
+ msg='slice key should apply slice to sequence')
+ self.assertEqual(traverse_obj(_SLICE_DATA, slice(1, 2)), _SLICE_DATA[1:2],
+ msg='slice key should apply slice to sequence')
+ self.assertEqual(traverse_obj(_SLICE_DATA, slice(1, 4, 2)), _SLICE_DATA[1:4:2],
+ msg='slice key should apply slice to sequence')
# Test alternative paths
self.assertEqual(traverse_obj(_TEST_DATA, 'fail', 'str'), 'str',
@@ -1979,15 +2158,23 @@ Line 1
{0: ['https://www.example.com/1', 'https://www.example.com/0']},
msg='tripple nesting in dict path should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'fail'}), {},
- msg='remove `None` values when dict key')
+ msg='remove `None` values when top level dict key fails')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'fail'}, default=...), {0: ...},
- msg='do not remove `None` values if `default`')
- self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}), {0: {}},
- msg='do not remove empty values when dict key')
- self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}, default=...), {0: {}},
- msg='do not remove empty values when dict key and a default')
- self.assertEqual(traverse_obj(_TEST_DATA, {0: ('dict', ...)}), {0: []},
- msg='if branch in dict key not successful, return `[]`')
+ msg='use `default` if key fails and `default`')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}), {},
+ msg='remove empty values when dict key')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}, default=...), {0: ...},
+ msg='use `default` when dict key and `default`')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 'fail'}}), {},
+ msg='remove empty values when nested dict key fails')
+ self.assertEqual(traverse_obj(None, {0: 'fail'}), {},
+ msg='default to dict if pruned')
+ self.assertEqual(traverse_obj(None, {0: 'fail'}, default=...), {0: ...},
+ msg='default to dict if pruned and default is given')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 'fail'}}, default=...), {0: {0: ...}},
+ msg='use nested `default` when nested dict key fails and `default`')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: ('dict', ...)}), {},
+ msg='remove key if branch in dict key not successful')
# Testing default parameter behavior
_DEFAULT_DATA = {'None': None, 'int': 0, 'list': []}
@@ -2011,20 +2198,55 @@ Line 1
msg='if branched but not successful return `[]`, not `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, ('list', ...)), [],
msg='if branched but object is empty return `[]`, not `default`')
+ self.assertEqual(traverse_obj(None, ...), [],
+ msg='if branched but object is `None` return `[]`, not `default`')
+ self.assertEqual(traverse_obj({0: None}, (0, ...)), [],
+ msg='if branched but state is `None` return `[]`, not `default`')
+
+ branching_paths = [
+ ('fail', ...),
+ (..., 'fail'),
+ 100 * ('fail',) + (...,),
+ (...,) + 100 * ('fail',),
+ ]
+ for branching_path in branching_paths:
+ self.assertEqual(traverse_obj({}, branching_path), [],
+ msg='if branched but state is `None`, return `[]` (not `default`)')
+ self.assertEqual(traverse_obj({}, 'fail', branching_path), [],
+ msg='if branching in last alternative and previous did not match, return `[]` (not `default`)')
+ self.assertEqual(traverse_obj({0: 'x'}, 0, branching_path), 'x',
+ msg='if branching in last alternative and previous did match, return single value')
+ self.assertEqual(traverse_obj({0: 'x'}, branching_path, 0), 'x',
+ msg='if branching in first alternative and non-branching path does match, return single value')
+ self.assertEqual(traverse_obj({}, branching_path, 'fail'), None,
+ msg='if branching in first alternative and non-branching path does not match, return `default`')
# Testing expected_type behavior
_EXPECTED_TYPE_DATA = {'str': 'str', 'int': 0}
- self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=str), 'str',
- msg='accept matching `expected_type` type')
- self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=int), None,
- msg='reject non matching `expected_type` type')
- self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'int', expected_type=lambda x: str(x)), '0',
- msg='transform type using type function')
- self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str',
- expected_type=lambda _: 1 / 0), None,
- msg='wrap expected_type fuction in try_call')
- self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, ..., expected_type=str), ['str'],
- msg='eliminate items that expected_type fails on')
+ self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=str),
+ 'str', msg='accept matching `expected_type` type')
+ self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=int),
+ None, msg='reject non matching `expected_type` type')
+ self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'int', expected_type=lambda x: str(x)),
+ '0', msg='transform type using type function')
+ self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=lambda _: 1 / 0),
+ None, msg='wrap expected_type fuction in try_call')
+ self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, ..., expected_type=str),
+ ['str'], msg='eliminate items that expected_type fails on')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2}, expected_type=int),
+ {0: 100}, msg='type as expected_type should filter dict values')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2, 2: 'None'}, expected_type=str_or_none),
+ {0: '100', 1: '1.2'}, msg='function as expected_type should transform dict values')
+ self.assertEqual(traverse_obj(_TEST_DATA, ({0: 1.2}, 0, {int_or_none}), expected_type=int),
+ 1, msg='expected_type should not filter non final dict values')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 100, 1: 'str'}}, expected_type=int),
+ {0: {0: 100}}, msg='expected_type should transform deep dict values')
+ self.assertEqual(traverse_obj(_TEST_DATA, [({0: '...'}, {0: '...'})], expected_type=type(...)),
+ [{0: ...}, {0: ...}], msg='expected_type should transform branched dict values')
+ self.assertEqual(traverse_obj({1: {3: 4}}, [(1, 2), 3], expected_type=int),
+ [4], msg='expected_type regression for type matching in tuple branching')
+ self.assertEqual(traverse_obj(_TEST_DATA, ['data', ...], expected_type=int),
+ [], msg='expected_type regression for type matching in dict result')
# Test get_all behavior
_GET_ALL_DATA = {'key': [0, 1, 2]}
@@ -2064,14 +2286,23 @@ Line 1
traverse_string=True), '.',
msg='traverse into converted data if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', ...),
- traverse_string=True), list('str'),
- msg='`...` branching into string should result in list')
+ traverse_string=True), 'str',
+ msg='`...` should result in string (same value) if `traverse_string`')
+ self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', slice(0, None, 2)),
+ traverse_string=True), 'sr',
+ msg='`slice` should result in string if `traverse_string`')
+ self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', lambda i, v: i or v == "s"),
+ traverse_string=True), 'str',
+ msg='function should result in string if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', (0, 2)),
traverse_string=True), ['s', 'r'],
- msg='branching into string should result in list')
- self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', lambda _, x: x),
- traverse_string=True), list('str'),
- msg='function branching into string should result in list')
+ msg='branching should result in list if `traverse_string`')
+ self.assertEqual(traverse_obj({}, (0, ...), traverse_string=True), [],
+ msg='branching should result in list if `traverse_string`')
+ self.assertEqual(traverse_obj({}, (0, lambda x, y: True), traverse_string=True), [],
+ msg='branching should result in list if `traverse_string`')
+ self.assertEqual(traverse_obj({}, (0, slice(1)), traverse_string=True), [],
+ msg='branching should result in list if `traverse_string`')
# Test is_user_input behavior
_IS_USER_INPUT_DATA = {'range8': list(range(8))}
@@ -2108,6 +2339,48 @@ Line 1
msg='failing str key on a `re.Match` should return `default`')
self.assertEqual(traverse_obj(mobj, 8), None,
msg='failing int key on a `re.Match` should return `default`')
+ self.assertEqual(traverse_obj(mobj, lambda k, _: k in (0, 'group')), ['0123', '3'],
+ msg='function on a `re.Match` should give group name as well')
+
+ def test_http_header_dict(self):
+ headers = HTTPHeaderDict()
+ headers['ytdl-test'] = b'0'
+ self.assertEqual(list(headers.items()), [('Ytdl-Test', '0')])
+ headers['ytdl-test'] = 1
+ self.assertEqual(list(headers.items()), [('Ytdl-Test', '1')])
+ headers['Ytdl-test'] = '2'
+ self.assertEqual(list(headers.items()), [('Ytdl-Test', '2')])
+ self.assertTrue('ytDl-Test' in headers)
+ self.assertEqual(str(headers), str(dict(headers)))
+ self.assertEqual(repr(headers), str(dict(headers)))
+
+ headers.update({'X-dlp': 'data'})
+ self.assertEqual(set(headers.items()), {('Ytdl-Test', '2'), ('X-Dlp', 'data')})
+ self.assertEqual(dict(headers), {'Ytdl-Test': '2', 'X-Dlp': 'data'})
+ self.assertEqual(len(headers), 2)
+ self.assertEqual(headers.copy(), headers)
+ headers2 = HTTPHeaderDict({'X-dlp': 'data3'}, **headers, **{'X-dlp': 'data2'})
+ self.assertEqual(set(headers2.items()), {('Ytdl-Test', '2'), ('X-Dlp', 'data2')})
+ self.assertEqual(len(headers2), 2)
+ headers2.clear()
+ self.assertEqual(len(headers2), 0)
+
+ # ensure we prefer latter headers
+ headers3 = HTTPHeaderDict({'Ytdl-TeSt': 1}, {'Ytdl-test': 2})
+ self.assertEqual(set(headers3.items()), {('Ytdl-Test', '2')})
+ del headers3['ytdl-tesT']
+ self.assertEqual(dict(headers3), {})
+
+ headers4 = HTTPHeaderDict({'ytdl-test': 'data;'})
+ self.assertEqual(set(headers4.items()), {('Ytdl-Test', 'data;')})
+
+ def test_extract_basic_auth(self):
+ assert extract_basic_auth('http://:foo.bar') == ('http://:foo.bar', None)
+ assert extract_basic_auth('http://foo.bar') == ('http://foo.bar', None)
+ assert extract_basic_auth('http://@foo.bar') == ('http://foo.bar', 'Basic Og==')
+ assert extract_basic_auth('http://:pass@foo.bar') == ('http://foo.bar', 'Basic OnBhc3M=')
+ assert extract_basic_auth('http://user:@foo.bar') == ('http://foo.bar', 'Basic dXNlcjo=')
+ assert extract_basic_auth('http://user:pass@foo.bar') == ('http://foo.bar', 'Basic dXNlcjpwYXNz')
if __name__ == '__main__':
diff --git a/test/testdata/yt_dlp_plugins/extractor/_ignore.py b/test/testdata/yt_dlp_plugins/extractor/_ignore.py
new file mode 100644
index 0000000..3ee321b
--- /dev/null
+++ b/test/testdata/yt_dlp_plugins/extractor/_ignore.py
@@ -0,0 +1,5 @@
+from hypervideo_dl.extractor.common import InfoExtractor
+
+
+class IgnorePluginIE(InfoExtractor):
+ pass
diff --git a/test/testdata/yt_dlp_plugins/extractor/ignore.py b/test/testdata/yt_dlp_plugins/extractor/ignore.py
new file mode 100644
index 0000000..0f7eaa4
--- /dev/null
+++ b/test/testdata/yt_dlp_plugins/extractor/ignore.py
@@ -0,0 +1,12 @@
+from hypervideo_dl.extractor.common import InfoExtractor
+
+
+class IgnoreNotInAllPluginIE(InfoExtractor):
+ pass
+
+
+class InAllPluginIE(InfoExtractor):
+ pass
+
+
+__all__ = ['InAllPluginIE']
diff --git a/test/testdata/yt_dlp_plugins/extractor/normal.py b/test/testdata/yt_dlp_plugins/extractor/normal.py
new file mode 100644
index 0000000..905b6b3
--- /dev/null
+++ b/test/testdata/yt_dlp_plugins/extractor/normal.py
@@ -0,0 +1,9 @@
+from hypervideo_dl.extractor.common import InfoExtractor
+
+
+class NormalPluginIE(InfoExtractor):
+ pass
+
+
+class _IgnoreUnderscorePluginIE(InfoExtractor):
+ pass
diff --git a/test/testdata/yt_dlp_plugins/postprocessor/normal.py b/test/testdata/yt_dlp_plugins/postprocessor/normal.py
new file mode 100644
index 0000000..51d3be6
--- /dev/null
+++ b/test/testdata/yt_dlp_plugins/postprocessor/normal.py
@@ -0,0 +1,5 @@
+from hypervideo_dl.postprocessor.common import PostProcessor
+
+
+class NormalPluginPP(PostProcessor):
+ pass
diff --git a/test/testdata/zipped_plugins/yt_dlp_plugins/extractor/zipped.py b/test/testdata/zipped_plugins/yt_dlp_plugins/extractor/zipped.py
new file mode 100644
index 0000000..c2263e1
--- /dev/null
+++ b/test/testdata/zipped_plugins/yt_dlp_plugins/extractor/zipped.py
@@ -0,0 +1,5 @@
+from hypervideo_dl.extractor.common import InfoExtractor
+
+
+class ZippedPluginIE(InfoExtractor):
+ pass
diff --git a/test/testdata/zipped_plugins/yt_dlp_plugins/postprocessor/zipped.py b/test/testdata/zipped_plugins/yt_dlp_plugins/postprocessor/zipped.py
new file mode 100644
index 0000000..047ebae
--- /dev/null
+++ b/test/testdata/zipped_plugins/yt_dlp_plugins/postprocessor/zipped.py
@@ -0,0 +1,5 @@
+from hypervideo_dl.postprocessor.common import PostProcessor
+
+
+class ZippedPluginPP(PostProcessor):
+ pass