aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/conftest.py21
-rw-r--r--test/helper.py4
-rw-r--r--test/test_InfoExtractor.py128
-rw-r--r--test/test_YoutubeDL.py168
-rw-r--r--test/test_YoutubeDLCookieJar.py24
-rw-r--r--test/test_aes.py6
-rw-r--r--test/test_age_restriction.py19
-rw-r--r--test/test_compat.py9
-rw-r--r--test/test_config.py227
-rw-r--r--test/test_cookies.py18
-rwxr-xr-xtest/test_download.py9
-rw-r--r--test/test_downloader_external.py139
-rw-r--r--test/test_downloader_http.py12
-rw-r--r--test/test_http.py192
-rw-r--r--test/test_jsinterp.py606
-rw-r--r--test/test_networking.py1439
-rw-r--r--test/test_networking_utils.py282
-rw-r--r--test/test_plugins.py73
-rw-r--r--test/test_socks.py521
-rw-r--r--test/test_utils.py363
-rw-r--r--test/test_youtube_signature.py33
-rw-r--r--test/testdata/yt_dlp_plugins/extractor/_ignore.py5
-rw-r--r--test/testdata/yt_dlp_plugins/extractor/ignore.py12
-rw-r--r--test/testdata/yt_dlp_plugins/extractor/normal.py9
-rw-r--r--test/testdata/yt_dlp_plugins/postprocessor/normal.py5
-rw-r--r--test/testdata/zipped_plugins/yt_dlp_plugins/extractor/zipped.py5
-rw-r--r--test/testdata/zipped_plugins/yt_dlp_plugins/postprocessor/zipped.py5
27 files changed, 3538 insertions, 796 deletions
diff --git a/test/conftest.py b/test/conftest.py
new file mode 100644
index 000000000..15549d30b
--- /dev/null
+++ b/test/conftest.py
@@ -0,0 +1,21 @@
+import functools
+import inspect
+
+import pytest
+
+from yt_dlp.networking import RequestHandler
+from yt_dlp.networking.common import _REQUEST_HANDLERS
+from yt_dlp.utils._utils import _YDLLogger as FakeLogger
+
+
+@pytest.fixture
+def handler(request):
+ RH_KEY = request.param
+ if inspect.isclass(RH_KEY) and issubclass(RH_KEY, RequestHandler):
+ handler = RH_KEY
+ elif RH_KEY in _REQUEST_HANDLERS:
+ handler = _REQUEST_HANDLERS[RH_KEY]
+ else:
+ pytest.skip(f'{RH_KEY} request handler is not available')
+
+ return functools.partial(handler, logger=FakeLogger)
diff --git a/test/helper.py b/test/helper.py
index 0b90660ff..539b2f618 100644
--- a/test/helper.py
+++ b/test/helper.py
@@ -194,8 +194,8 @@ def sanitize_got_info_dict(got_dict):
'formats', 'thumbnails', 'subtitles', 'automatic_captions', 'comments', 'entries',
# Auto-generated
- 'autonumber', 'playlist', 'format_index', 'video_ext', 'audio_ext', 'duration_string', 'epoch',
- 'fulltitle', 'extractor', 'extractor_key', 'filepath', 'infojson_filename', 'original_url', 'n_entries',
+ 'autonumber', 'playlist', 'format_index', 'video_ext', 'audio_ext', 'duration_string', 'epoch', 'n_entries',
+ 'fulltitle', 'extractor', 'extractor_key', 'filename', 'filepath', 'infojson_filename', 'original_url',
# Only live_status needs to be checked
'is_live', 'was_live',
diff --git a/test/test_InfoExtractor.py b/test/test_InfoExtractor.py
index 683ead315..b7dee496a 100644
--- a/test/test_InfoExtractor.py
+++ b/test/test_InfoExtractor.py
@@ -69,6 +69,7 @@ class TestInfoExtractor(unittest.TestCase):
<meta name="og:test1" content='foo > < bar'/>
<meta name="og:test2" content="foo >//< bar"/>
<meta property=og-test3 content='Ill-formatted opengraph'/>
+ <meta property=og:test4 content=unquoted-value/>
'''
self.assertEqual(ie._og_search_title(html), 'Foo')
self.assertEqual(ie._og_search_description(html), 'Some video\'s description ')
@@ -81,6 +82,7 @@ class TestInfoExtractor(unittest.TestCase):
self.assertEqual(ie._og_search_property(('test0', 'test1'), html), 'foo > < bar')
self.assertRaises(RegexNotFoundError, ie._og_search_property, 'test0', html, None, fatal=True)
self.assertRaises(RegexNotFoundError, ie._og_search_property, ('test0', 'test00'), html, None, fatal=True)
+ self.assertEqual(ie._og_search_property('test4', html), 'unquoted-value')
def test_html_search_meta(self):
ie = self.ie
@@ -915,8 +917,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'acodec': 'mp4a.40.2',
'video_ext': 'mp4',
'audio_ext': 'none',
- 'vbr': 263.851,
- 'abr': 0,
}, {
'format_id': '577',
'format_index': None,
@@ -934,8 +934,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'acodec': 'mp4a.40.2',
'video_ext': 'mp4',
'audio_ext': 'none',
- 'vbr': 577.61,
- 'abr': 0,
}, {
'format_id': '915',
'format_index': None,
@@ -953,8 +951,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'acodec': 'mp4a.40.2',
'video_ext': 'mp4',
'audio_ext': 'none',
- 'vbr': 915.905,
- 'abr': 0,
}, {
'format_id': '1030',
'format_index': None,
@@ -972,8 +968,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'acodec': 'mp4a.40.2',
'video_ext': 'mp4',
'audio_ext': 'none',
- 'vbr': 1030.138,
- 'abr': 0,
}, {
'format_id': '1924',
'format_index': None,
@@ -991,8 +985,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'acodec': 'mp4a.40.2',
'video_ext': 'mp4',
'audio_ext': 'none',
- 'vbr': 1924.009,
- 'abr': 0,
}],
{
'en': [{
@@ -1404,6 +1396,7 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'none',
'acodec': 'AACL',
'protocol': 'ism',
+ 'audio_channels': 2,
'_download_params': {
'stream_type': 'audio',
'duration': 8880746666,
@@ -1417,9 +1410,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'audio_ext': 'isma',
- 'video_ext': 'none',
- 'abr': 128,
}, {
'format_id': 'video-100',
'url': 'https://sdn-global-streaming-cache-3qsdn.akamaized.net/stream/3144/files/17/07/672975/3144-kZT4LWMQw6Rh7Kpd.ism/Manifest',
@@ -1443,9 +1433,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 100,
}, {
'format_id': 'video-326',
'url': 'https://sdn-global-streaming-cache-3qsdn.akamaized.net/stream/3144/files/17/07/672975/3144-kZT4LWMQw6Rh7Kpd.ism/Manifest',
@@ -1469,9 +1456,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 326,
}, {
'format_id': 'video-698',
'url': 'https://sdn-global-streaming-cache-3qsdn.akamaized.net/stream/3144/files/17/07/672975/3144-kZT4LWMQw6Rh7Kpd.ism/Manifest',
@@ -1495,9 +1479,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 698,
}, {
'format_id': 'video-1493',
'url': 'https://sdn-global-streaming-cache-3qsdn.akamaized.net/stream/3144/files/17/07/672975/3144-kZT4LWMQw6Rh7Kpd.ism/Manifest',
@@ -1521,9 +1502,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 1493,
}, {
'format_id': 'video-4482',
'url': 'https://sdn-global-streaming-cache-3qsdn.akamaized.net/stream/3144/files/17/07/672975/3144-kZT4LWMQw6Rh7Kpd.ism/Manifest',
@@ -1547,9 +1525,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 4482,
}],
{
'eng': [
@@ -1573,61 +1548,57 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'ec-3_test',
'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
[{
- 'format_id': 'audio_deu_1-224',
+ 'format_id': 'audio_deu-127',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
'manifest_url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
'ext': 'isma',
- 'tbr': 224,
+ 'tbr': 127,
'asr': 48000,
'vcodec': 'none',
- 'acodec': 'EC-3',
+ 'acodec': 'AACL',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ 'audio_channels': 2,
+ '_download_params': {
'stream_type': 'audio',
'duration': 370000000,
'timescale': 10000000,
'width': 0,
'height': 0,
- 'fourcc': 'EC-3',
+ 'fourcc': 'AACL',
'language': 'deu',
- 'codec_private_data': '00063F000000AF87FBA7022DFB42A4D405CD93843BDD0700200F00',
+ 'codec_private_data': '1190',
'sampling_rate': 48000,
- 'channels': 6,
+ 'channels': 2,
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'audio_ext': 'isma',
- 'video_ext': 'none',
- 'abr': 224,
}, {
- 'format_id': 'audio_deu-127',
+ 'format_id': 'audio_deu_1-224',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
'manifest_url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
'ext': 'isma',
- 'tbr': 127,
+ 'tbr': 224,
'asr': 48000,
'vcodec': 'none',
- 'acodec': 'AACL',
+ 'acodec': 'EC-3',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ 'audio_channels': 6,
+ '_download_params': {
'stream_type': 'audio',
'duration': 370000000,
'timescale': 10000000,
'width': 0,
'height': 0,
- 'fourcc': 'AACL',
+ 'fourcc': 'EC-3',
'language': 'deu',
- 'codec_private_data': '1190',
+ 'codec_private_data': '00063F000000AF87FBA7022DFB42A4D405CD93843BDD0700200F00',
'sampling_rate': 48000,
- 'channels': 2,
+ 'channels': 6,
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'audio_ext': 'isma',
- 'video_ext': 'none',
- 'abr': 127,
}, {
'format_id': 'video_deu-23',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1639,8 +1610,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1653,9 +1624,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 23,
}, {
'format_id': 'video_deu-403',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1667,8 +1635,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1681,9 +1649,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 403,
}, {
'format_id': 'video_deu-680',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1695,8 +1660,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1709,9 +1674,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 680,
}, {
'format_id': 'video_deu-1253',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1723,8 +1685,9 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'vbr': 1253,
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1737,9 +1700,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 1253,
}, {
'format_id': 'video_deu-2121',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1751,8 +1711,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1765,9 +1725,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 2121,
}, {
'format_id': 'video_deu-3275',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1779,8 +1736,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1793,9 +1750,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 3275,
}, {
'format_id': 'video_deu-5300',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1807,8 +1761,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1821,9 +1775,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 5300,
}, {
'format_id': 'video_deu-8079',
'url': 'https://smstr01.dmm.t-online.de/smooth24/smoothstream_m1/streaming/sony/9221438342941275747/636887760842957027/25_km_h-Trailer-9221571562372022953_deu_20_1300k_HD_H_264_ISMV.ism/Manifest',
@@ -1835,8 +1786,8 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'vcodec': 'AVC1',
'acodec': 'none',
'protocol': 'ism',
- '_download_params':
- {
+ 'language': 'deu',
+ '_download_params': {
'stream_type': 'video',
'duration': 370000000,
'timescale': 10000000,
@@ -1849,9 +1800,6 @@ jwplayer("mediaplayer").setup({"abouttext":"Visit Indie DB","aboutlink":"http:\/
'bits_per_sample': 16,
'nal_unit_length_field': 4
},
- 'video_ext': 'ismv',
- 'audio_ext': 'none',
- 'vbr': 8079,
}],
{},
),
diff --git a/test/test_YoutubeDL.py b/test/test_YoutubeDL.py
index 8da1e5e4b..3cfb61fb2 100644
--- a/test/test_YoutubeDL.py
+++ b/test/test_YoutubeDL.py
@@ -10,9 +10,8 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import copy
import json
-import urllib.error
-from test.helper import FakeYDL, assertRegexpMatches
+from test.helper import FakeYDL, assertRegexpMatches, try_rm
from yt_dlp import YoutubeDL
from yt_dlp.compat import compat_os_name
from yt_dlp.extractor import YoutubeIE
@@ -25,6 +24,7 @@ from yt_dlp.utils import (
int_or_none,
match_filter_func,
)
+from yt_dlp.utils.traversal import traverse_obj
TEST_URL = 'http://localhost/sample.mp4'
@@ -632,6 +632,7 @@ class TestYoutubeDL(unittest.TestCase):
outtmpl_info = {
'id': '1234',
+ 'id': '1234',
'ext': 'mp4',
'width': None,
'height': 1080,
@@ -669,7 +670,7 @@ class TestYoutubeDL(unittest.TestCase):
for (name, got), expect in zip((('outtmpl', out), ('filename', fname)), expected):
if callable(expect):
self.assertTrue(expect(got), f'Wrong {name} from {tmpl}')
- else:
+ elif expect is not None:
self.assertEqual(got, expect, f'Wrong {name} from {tmpl}')
# Side-effects
@@ -684,7 +685,8 @@ class TestYoutubeDL(unittest.TestCase):
test('%(id)s.%(ext)s', '1234.mp4')
test('%(duration_string)s', ('27:46:40', '27-46-40'))
test('%(resolution)s', '1080p')
- test('%(playlist_index)s', '001')
+ test('%(playlist_index|)s', '001')
+ test('%(playlist_index&{}!)s', '1!')
test('%(playlist_autonumber)s', '02')
test('%(autonumber)s', '00001')
test('%(autonumber+2)03d', '005', autonumber_start=3)
@@ -755,20 +757,23 @@ class TestYoutubeDL(unittest.TestCase):
test('%(ext)c', 'm')
test('%(id)d %(id)r', "1234 '1234'")
test('%(id)r %(height)r', "'1234' 1080")
+ test('%(title5)a %(height)a', (R"'\xe1\xe9\xed \U0001d400' 1080", None))
test('%(ext)s-%(ext|def)d', 'mp4-def')
- test('%(width|0)04d', '0000')
- test('a%(width|)d', 'a', outtmpl_na_placeholder='none')
+ test('%(width|0)04d', '0')
+ test('a%(width|b)d', 'ab', outtmpl_na_placeholder='none')
FORMATS = self.outtmpl_info['formats']
- sanitize = lambda x: x.replace(':', ':').replace('"', """).replace('\n', ' ')
# Custom type casting
test('%(formats.:.id)l', 'id 1, id 2, id 3')
test('%(formats.:.id)#l', ('id 1\nid 2\nid 3', 'id 1 id 2 id 3'))
test('%(ext)l', 'mp4')
test('%(formats.:.id) 18l', ' id 1, id 2, id 3')
- test('%(formats)j', (json.dumps(FORMATS), sanitize(json.dumps(FORMATS))))
- test('%(formats)#j', (json.dumps(FORMATS, indent=4), sanitize(json.dumps(FORMATS, indent=4))))
+ test('%(formats)j', (json.dumps(FORMATS), None))
+ test('%(formats)#j', (
+ json.dumps(FORMATS, indent=4),
+ json.dumps(FORMATS, indent=4).replace(':', ':').replace('"', """).replace('\n', ' ')
+ ))
test('%(title5).3B', 'á')
test('%(title5)U', 'áéí 𝐀')
test('%(title5)#U', 'a\u0301e\u0301i\u0301 𝐀')
@@ -793,8 +798,8 @@ class TestYoutubeDL(unittest.TestCase):
test('%(title|%)s %(title|%%)s', '% %%')
test('%(id+1-height+3)05d', '00158')
test('%(width+100)05d', 'NA')
- test('%(formats.0) 15s', ('% 15s' % FORMATS[0], '% 15s' % sanitize(str(FORMATS[0]))))
- test('%(formats.0)r', (repr(FORMATS[0]), sanitize(repr(FORMATS[0]))))
+ test('%(formats.0) 15s', ('% 15s' % FORMATS[0], None))
+ test('%(formats.0)r', (repr(FORMATS[0]), None))
test('%(height.0)03d', '001')
test('%(-height.0)04d', '-001')
test('%(formats.-1.id)s', FORMATS[-1]['id'])
@@ -806,7 +811,7 @@ class TestYoutubeDL(unittest.TestCase):
out = json.dumps([{'id': f['id'], 'height.:2': str(f['height'])[:2]}
if 'height' in f else {'id': f['id']}
for f in FORMATS])
- test('%(formats.:.{id,height.:2})j', (out, sanitize(out)))
+ test('%(formats.:.{id,height.:2})j', (out, None))
test('%(formats.:.{id,height}.id)l', ', '.join(f['id'] for f in FORMATS))
test('%(.{id,title})j', ('{"id": "1234"}', '{"id": "1234"}'))
@@ -822,6 +827,11 @@ class TestYoutubeDL(unittest.TestCase):
test('%(title&foo|baz)s.bar', 'baz.bar')
test('%(x,id&foo|baz)s.bar', 'foo.bar')
test('%(x,title&foo|baz)s.bar', 'baz.bar')
+ test('%(id&a\nb|)s', ('a\nb', 'a b'))
+ test('%(id&hi {:>10} {}|)s', 'hi 1234 1234')
+ test(R'%(id&{0} {}|)s', 'NA')
+ test(R'%(id&{0.1}|)s', 'NA')
+ test('%(height&{:,d})S', '1,080')
# Laziness
def gen():
@@ -867,12 +877,12 @@ class TestYoutubeDL(unittest.TestCase):
class SimplePP(PostProcessor):
def run(self, info):
- with open(audiofile, 'wt') as f:
+ with open(audiofile, 'w') as f:
f.write('EXAMPLE')
return [info['filepath']], info
def run_pp(params, PP):
- with open(filename, 'wt') as f:
+ with open(filename, 'w') as f:
f.write('EXAMPLE')
ydl = YoutubeDL(params)
ydl.add_post_processor(PP())
@@ -891,7 +901,7 @@ class TestYoutubeDL(unittest.TestCase):
class ModifierPP(PostProcessor):
def run(self, info):
- with open(info['filepath'], 'wt') as f:
+ with open(info['filepath'], 'w') as f:
f.write('MODIFIED')
return [], info
@@ -1093,11 +1103,6 @@ class TestYoutubeDL(unittest.TestCase):
test_selection({'playlist_items': '-15::2'}, INDICES[1::2], True)
test_selection({'playlist_items': '-15::15'}, [], True)
- def test_urlopen_no_file_protocol(self):
- # see https://github.com/ytdl-org/youtube-dl/issues/8227
- ydl = YDL()
- self.assertRaises(urllib.error.URLError, ydl.urlopen, 'file:///etc/passwd')
-
def test_do_not_override_ie_key_in_url_transparent(self):
ydl = YDL()
@@ -1211,6 +1216,129 @@ class TestYoutubeDL(unittest.TestCase):
self.assertEqual(downloaded['extractor'], 'Video')
self.assertEqual(downloaded['extractor_key'], 'Video')
+ def test_header_cookies(self):
+ from http.cookiejar import Cookie
+
+ ydl = FakeYDL()
+ ydl.report_warning = lambda *_, **__: None
+
+ def cookie(name, value, version=None, domain='', path='', secure=False, expires=None):
+ return Cookie(
+ version or 0, name, value, None, False,
+ domain, bool(domain), bool(domain), path, bool(path),
+ secure, expires, False, None, None, rest={})
+
+ _test_url = 'https://yt.dlp/test'
+
+ def test(encoded_cookies, cookies, *, headers=False, round_trip=None, error_re=None):
+ def _test():
+ ydl.cookiejar.clear()
+ ydl._load_cookies(encoded_cookies, autoscope=headers)
+ if headers:
+ ydl._apply_header_cookies(_test_url)
+ data = {'url': _test_url}
+ ydl._calc_headers(data)
+ self.assertCountEqual(
+ map(vars, ydl.cookiejar), map(vars, cookies),
+ 'Extracted cookiejar.Cookie is not the same')
+ if not headers:
+ self.assertEqual(
+ data.get('cookies'), round_trip or encoded_cookies,
+ 'Cookie is not the same as round trip')
+ ydl.__dict__['_YoutubeDL__header_cookies'] = []
+
+ with self.subTest(msg=encoded_cookies):
+ if not error_re:
+ _test()
+ return
+ with self.assertRaisesRegex(Exception, error_re):
+ _test()
+
+ test('test=value; Domain=.yt.dlp', [cookie('test', 'value', domain='.yt.dlp')])
+ test('test=value', [cookie('test', 'value')], error_re=r'Unscoped cookies are not allowed')
+ test('cookie1=value1; Domain=.yt.dlp; Path=/test; cookie2=value2; Domain=.yt.dlp; Path=/', [
+ cookie('cookie1', 'value1', domain='.yt.dlp', path='/test'),
+ cookie('cookie2', 'value2', domain='.yt.dlp', path='/')])
+ test('test=value; Domain=.yt.dlp; Path=/test; Secure; Expires=9999999999', [
+ cookie('test', 'value', domain='.yt.dlp', path='/test', secure=True, expires=9999999999)])
+ test('test="value; "; path=/test; domain=.yt.dlp', [
+ cookie('test', 'value; ', domain='.yt.dlp', path='/test')],
+ round_trip='test="value\\073 "; Domain=.yt.dlp; Path=/test')
+ test('name=; Domain=.yt.dlp', [cookie('name', '', domain='.yt.dlp')],
+ round_trip='name=""; Domain=.yt.dlp')
+
+ test('test=value', [cookie('test', 'value', domain='.yt.dlp')], headers=True)
+ test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error_re=r'Invalid syntax')
+ ydl.deprecated_feature = ydl.report_error
+ test('test=value', [], headers=True, error_re=r'Passing cookies as a header is a potential security risk')
+
+ def test_infojson_cookies(self):
+ TEST_FILE = 'test_infojson_cookies.info.json'
+ TEST_URL = 'https://example.com/example.mp4'
+ COOKIES = 'a=b; Domain=.example.com; c=d; Domain=.example.com'
+ COOKIE_HEADER = {'Cookie': 'a=b; c=d'}
+
+ ydl = FakeYDL()
+ ydl.process_info = lambda x: ydl._write_info_json('test', x, TEST_FILE)
+
+ def make_info(info_header_cookies=False, fmts_header_cookies=False, cookies_field=False):
+ fmt = {'url': TEST_URL}
+ if fmts_header_cookies:
+ fmt['http_headers'] = COOKIE_HEADER
+ if cookies_field:
+ fmt['cookies'] = COOKIES
+ return _make_result([fmt], http_headers=COOKIE_HEADER if info_header_cookies else None)
+
+ def test(initial_info, note):
+ result = {}
+ result['processed'] = ydl.process_ie_result(initial_info)
+ self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
+ msg=f'No cookies set in cookiejar after initial process when {note}')
+ ydl.cookiejar.clear()
+ with open(TEST_FILE) as infojson:
+ result['loaded'] = ydl.sanitize_info(json.load(infojson), True)
+ result['final'] = ydl.process_ie_result(result['loaded'].copy(), download=False)
+ self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
+ msg=f'No cookies set in cookiejar after final process when {note}')
+ ydl.cookiejar.clear()
+ for key in ('processed', 'loaded', 'final'):
+ info = result[key]
+ self.assertIsNone(
+ traverse_obj(info, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False),
+ msg=f'Cookie header not removed in {key} result when {note}')
+ self.assertEqual(
+ traverse_obj(info, ((None, ('formats', 0)), 'cookies'), get_all=False), COOKIES,
+ msg=f'No cookies field found in {key} result when {note}')
+
+ test({'url': TEST_URL, 'http_headers': COOKIE_HEADER, 'id': '1', 'title': 'x'}, 'no formats field')
+ test(make_info(info_header_cookies=True), 'info_dict header cokies')
+ test(make_info(fmts_header_cookies=True), 'format header cookies')
+ test(make_info(info_header_cookies=True, fmts_header_cookies=True), 'info_dict and format header cookies')
+ test(make_info(info_header_cookies=True, fmts_header_cookies=True, cookies_field=True), 'all cookies fields')
+ test(make_info(cookies_field=True), 'cookies format field')
+ test({'url': TEST_URL, 'cookies': COOKIES, 'id': '1', 'title': 'x'}, 'info_dict cookies field only')
+
+ try_rm(TEST_FILE)
+
+ def test_add_headers_cookie(self):
+ def check_for_cookie_header(result):
+ return traverse_obj(result, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False)
+
+ ydl = FakeYDL({'http_headers': {'Cookie': 'a=b'}})
+ ydl._apply_header_cookies(_make_result([])['webpage_url']) # Scope to input webpage URL: .example.com
+
+ fmt = {'url': 'https://example.com/video.mp4'}
+ result = ydl.process_ie_result(_make_result([fmt]), download=False)
+ self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies in result info_dict')
+ self.assertEqual(result.get('cookies'), 'a=b; Domain=.example.com', msg='No cookies were set in cookies field')
+ self.assertIn('a=b', ydl.cookiejar.get_cookie_header(fmt['url']), msg='No cookies were set in cookiejar')
+
+ fmt = {'url': 'https://wrong.com/video.mp4'}
+ result = ydl.process_ie_result(_make_result([fmt]), download=False)
+ self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies for wrong domain')
+ self.assertFalse(result.get('cookies'), msg='Cookies set in cookies field for wrong domain')
+ self.assertFalse(ydl.cookiejar.get_cookie_header(fmt['url']), msg='Cookies set in cookiejar for wrong domain')
+
if __name__ == '__main__':
unittest.main()
diff --git a/test/test_YoutubeDLCookieJar.py b/test/test_YoutubeDLCookieJar.py
index 0d4e7dc97..fdb9baee5 100644
--- a/test/test_YoutubeDLCookieJar.py
+++ b/test/test_YoutubeDLCookieJar.py
@@ -11,16 +11,16 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import re
import tempfile
-from yt_dlp.utils import YoutubeDLCookieJar
+from yt_dlp.cookies import YoutubeDLCookieJar
class TestYoutubeDLCookieJar(unittest.TestCase):
def test_keep_session_cookies(self):
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/session_cookies.txt')
- cookiejar.load(ignore_discard=True, ignore_expires=True)
+ cookiejar.load()
tf = tempfile.NamedTemporaryFile(delete=False)
try:
- cookiejar.save(filename=tf.name, ignore_discard=True, ignore_expires=True)
+ cookiejar.save(filename=tf.name)
temp = tf.read().decode()
self.assertTrue(re.search(
r'www\.foobar\.foobar\s+FALSE\s+/\s+TRUE\s+0\s+YoutubeDLExpiresEmpty\s+YoutubeDLExpiresEmptyValue', temp))
@@ -32,7 +32,7 @@ class TestYoutubeDLCookieJar(unittest.TestCase):
def test_strip_httponly_prefix(self):
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/httponly_cookies.txt')
- cookiejar.load(ignore_discard=True, ignore_expires=True)
+ cookiejar.load()
def assert_cookie_has_value(key):
self.assertEqual(cookiejar._cookies['www.foobar.foobar']['/'][key].value, key + '_VALUE')
@@ -42,11 +42,25 @@ class TestYoutubeDLCookieJar(unittest.TestCase):
def test_malformed_cookies(self):
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/malformed_cookies.txt')
- cookiejar.load(ignore_discard=True, ignore_expires=True)
+ cookiejar.load()
# Cookies should be empty since all malformed cookie file entries
# will be ignored
self.assertFalse(cookiejar._cookies)
+ def test_get_cookie_header(self):
+ cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/httponly_cookies.txt')
+ cookiejar.load()
+ header = cookiejar.get_cookie_header('https://www.foobar.foobar')
+ self.assertIn('HTTPONLY_COOKIE', header)
+
+ def test_get_cookies_for_url(self):
+ cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/session_cookies.txt')
+ cookiejar.load()
+ cookies = cookiejar.get_cookies_for_url('https://www.foobar.foobar/')
+ self.assertEqual(len(cookies), 2)
+ cookies = cookiejar.get_cookies_for_url('https://foobar.foobar/')
+ self.assertFalse(cookies)
+
if __name__ == '__main__':
unittest.main()
diff --git a/test/test_aes.py b/test/test_aes.py
index 8e8fc0b3e..a26abfd7d 100644
--- a/test/test_aes.py
+++ b/test/test_aes.py
@@ -26,7 +26,7 @@ from yt_dlp.aes import (
key_expansion,
pad_block,
)
-from yt_dlp.dependencies import Cryptodome_AES
+from yt_dlp.dependencies import Cryptodome
from yt_dlp.utils import bytes_to_intlist, intlist_to_bytes
# the encrypted data can be generate with 'devscripts/generate_aes_testdata.py'
@@ -48,7 +48,7 @@ class TestAES(unittest.TestCase):
data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd'
decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
- if Cryptodome_AES:
+ if Cryptodome.AES:
decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
@@ -78,7 +78,7 @@ class TestAES(unittest.TestCase):
decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
- if Cryptodome_AES:
+ if Cryptodome.AES:
decrypted = aes_gcm_decrypt_and_verify_bytes(
data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12]))
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
diff --git a/test/test_age_restriction.py b/test/test_age_restriction.py
index ff248432b..68107590e 100644
--- a/test/test_age_restriction.py
+++ b/test/test_age_restriction.py
@@ -10,6 +10,7 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from test.helper import is_download_test, try_rm
from yt_dlp import YoutubeDL
+from yt_dlp.utils import DownloadError
def _download_restricted(url, filename, age):
@@ -25,10 +26,14 @@ def _download_restricted(url, filename, age):
ydl.add_default_info_extractors()
json_filename = os.path.splitext(filename)[0] + '.info.json'
try_rm(json_filename)
- ydl.download([url])
- res = os.path.exists(json_filename)
- try_rm(json_filename)
- return res
+ try:
+ ydl.download([url])
+ except DownloadError:
+ pass
+ else:
+ return os.path.exists(json_filename)
+ finally:
+ try_rm(json_filename)
@is_download_test
@@ -38,12 +43,12 @@ class TestAgeRestriction(unittest.TestCase):
self.assertFalse(_download_restricted(url, filename, age))
def test_youtube(self):
- self._assert_restricted('07FYdnEawAQ', '07FYdnEawAQ.mp4', 10)
+ self._assert_restricted('HtVdAasjOgU', 'HtVdAasjOgU.mp4', 10)
def test_youporn(self):
self._assert_restricted(
- 'http://www.youporn.com/watch/505835/sex-ed-is-it-safe-to-masturbate-daily/',
- '505835.mp4', 2, old_age=25)
+ 'https://www.youporn.com/watch/16715086/sex-ed-in-detention-18-asmr/',
+ '16715086.mp4', 2, old_age=25)
if __name__ == '__main__':
diff --git a/test/test_compat.py b/test/test_compat.py
index e3d775bc1..71ca7f99f 100644
--- a/test/test_compat.py
+++ b/test/test_compat.py
@@ -9,15 +9,16 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import struct
-import urllib.parse
from yt_dlp import compat
+from yt_dlp.compat import urllib # isort: split
from yt_dlp.compat import (
compat_etree_fromstring,
compat_expanduser,
compat_urllib_parse_unquote,
compat_urllib_parse_urlencode,
)
+from yt_dlp.compat.urllib.request import getproxies
class TestCompat(unittest.TestCase):
@@ -28,8 +29,10 @@ class TestCompat(unittest.TestCase):
with self.assertWarns(DeprecationWarning):
compat.WINDOWS_VT_MODE
- # TODO: Test submodule
- # compat.asyncio.events # Must not raise error
+ self.assertEqual(urllib.request.getproxies, getproxies)
+
+ with self.assertWarns(DeprecationWarning):
+ compat.compat_pycrypto_AES # Must not raise error
def test_compat_expanduser(self):
old_home = os.environ.get('HOME')
diff --git a/test/test_config.py b/test/test_config.py
new file mode 100644
index 000000000..a393b6534
--- /dev/null
+++ b/test/test_config.py
@@ -0,0 +1,227 @@
+#!/usr/bin/env python3
+
+# Allow direct execution
+import os
+import sys
+import unittest
+import unittest.mock
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import contextlib
+import itertools
+from pathlib import Path
+
+from yt_dlp.compat import compat_expanduser
+from yt_dlp.options import create_parser, parseOpts
+from yt_dlp.utils import Config, get_executable_path
+
+ENVIRON_DEFAULTS = {
+ 'HOME': None,
+ 'XDG_CONFIG_HOME': '/_xdg_config_home/',
+ 'USERPROFILE': 'C:/Users/testing/',
+ 'APPDATA': 'C:/Users/testing/AppData/Roaming/',
+ 'HOMEDRIVE': 'C:/',
+ 'HOMEPATH': 'Users/testing/',
+}
+
+
+@contextlib.contextmanager
+def set_environ(**kwargs):
+ saved_environ = os.environ.copy()
+
+ for name, value in {**ENVIRON_DEFAULTS, **kwargs}.items():
+ if value is None:
+ os.environ.pop(name, None)
+ else:
+ os.environ[name] = value
+
+ yield
+
+ os.environ.clear()
+ os.environ.update(saved_environ)
+
+
+def _generate_expected_groups():
+ xdg_config_home = os.getenv('XDG_CONFIG_HOME') or compat_expanduser('~/.config')
+ appdata_dir = os.getenv('appdata')
+ home_dir = compat_expanduser('~')
+ return {
+ 'Portable': [
+ Path(get_executable_path(), 'yt-dlp.conf'),
+ ],
+ 'Home': [
+ Path('yt-dlp.conf'),
+ ],
+ 'User': [
+ Path(xdg_config_home, 'yt-dlp.conf'),
+ Path(xdg_config_home, 'yt-dlp', 'config'),
+ Path(xdg_config_home, 'yt-dlp', 'config.txt'),
+ *((
+ Path(appdata_dir, 'yt-dlp.conf'),
+ Path(appdata_dir, 'yt-dlp', 'config'),
+ Path(appdata_dir, 'yt-dlp', 'config.txt'),
+ ) if appdata_dir else ()),
+ Path(home_dir, 'yt-dlp.conf'),
+ Path(home_dir, 'yt-dlp.conf.txt'),
+ Path(home_dir, '.yt-dlp', 'config'),
+ Path(home_dir, '.yt-dlp', 'config.txt'),
+ ],
+ 'System': [
+ Path('/etc/yt-dlp.conf'),
+ Path('/etc/yt-dlp/config'),
+ Path('/etc/yt-dlp/config.txt'),
+ ]
+ }
+
+
+class TestConfig(unittest.TestCase):
+ maxDiff = None
+
+ @set_environ()
+ def test_config__ENVIRON_DEFAULTS_sanity(self):
+ expected = make_expected()
+ self.assertCountEqual(
+ set(expected), expected,
+ 'ENVIRON_DEFAULTS produces non unique names')
+
+ def test_config_all_environ_values(self):
+ for name, value in ENVIRON_DEFAULTS.items():
+ for new_value in (None, '', '.', value or '/some/dir'):
+ with set_environ(**{name: new_value}):
+ self._simple_grouping_test()
+
+ def test_config_default_expected_locations(self):
+ files, _ = self._simple_config_test()
+ self.assertEqual(
+ files, make_expected(),
+ 'Not all expected locations have been checked')
+
+ def test_config_default_grouping(self):
+ self._simple_grouping_test()
+
+ def _simple_grouping_test(self):
+ expected_groups = make_expected_groups()
+ for name, group in expected_groups.items():
+ for index, existing_path in enumerate(group):
+ result, opts = self._simple_config_test(existing_path)
+ expected = expected_from_expected_groups(expected_groups, existing_path)
+ self.assertEqual(
+ result, expected,
+ f'The checked locations do not match the expected ({name}, {index})')
+ self.assertEqual(
+ opts.outtmpl['default'], '1',
+ f'The used result value was incorrect ({name}, {index})')
+
+ def _simple_config_test(self, *stop_paths):
+ encountered = 0
+ paths = []
+
+ def read_file(filename, default=[]):
+ nonlocal encountered
+ path = Path(filename)
+ paths.append(path)
+ if path in stop_paths:
+ encountered += 1
+ return ['-o', f'{encountered}']
+
+ with ConfigMock(read_file):
+ _, opts, _ = parseOpts([], False)
+
+ return paths, opts
+
+ @set_environ()
+ def test_config_early_exit_commandline(self):
+ self._early_exit_test(0, '--ignore-config')
+
+ @set_environ()
+ def test_config_early_exit_files(self):
+ for index, _ in enumerate(make_expected(), 1):
+ self._early_exit_test(index)
+
+ def _early_exit_test(self, allowed_reads, *args):
+ reads = 0
+
+ def read_file(filename, default=[]):
+ nonlocal reads
+ reads += 1
+
+ if reads > allowed_reads:
+ self.fail('The remaining config was not ignored')
+ elif reads == allowed_reads:
+ return ['--ignore-config']
+
+ with ConfigMock(read_file):
+ parseOpts(args, False)
+
+ @set_environ()
+ def test_config_override_commandline(self):
+ self._override_test(0, '-o', 'pass')
+
+ @set_environ()
+ def test_config_override_files(self):
+ for index, _ in enumerate(make_expected(), 1):
+ self._override_test(index)
+
+ def _override_test(self, start_index, *args):
+ index = 0
+
+ def read_file(filename, default=[]):
+ nonlocal index
+ index += 1
+
+ if index > start_index:
+ return ['-o', 'fail']
+ elif index == start_index:
+ return ['-o', 'pass']
+
+ with ConfigMock(read_file):
+ _, opts, _ = parseOpts(args, False)
+
+ self.assertEqual(
+ opts.outtmpl['default'], 'pass',
+ 'The earlier group did not override the later ones')
+
+
+@contextlib.contextmanager
+def ConfigMock(read_file=None):
+ with unittest.mock.patch('yt_dlp.options.Config') as mock:
+ mock.return_value = Config(create_parser())
+ if read_file is not None:
+ mock.read_file = read_file
+
+ yield mock
+
+
+def make_expected(*filepaths):
+ return expected_from_expected_groups(_generate_expected_groups(), *filepaths)
+
+
+def make_expected_groups(*filepaths):
+ return _filter_expected_groups(_generate_expected_groups(), filepaths)
+
+
+def expected_from_expected_groups(expected_groups, *filepaths):
+ return list(itertools.chain.from_iterable(
+ _filter_expected_groups(expected_groups, filepaths).values()))
+
+
+def _filter_expected_groups(expected, filepaths):
+ if not filepaths:
+ return expected
+
+ result = {}
+ for group, paths in expected.items():
+ new_paths = []
+ for path in paths:
+ new_paths.append(path)
+ if path in filepaths:
+ break
+
+ result[group] = new_paths
+
+ return result
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_cookies.py b/test/test_cookies.py
index 4155bcbf5..5282ef621 100644
--- a/test/test_cookies.py
+++ b/test/test_cookies.py
@@ -49,32 +49,38 @@ class TestCookies(unittest.TestCase):
""" based on https://chromium.googlesource.com/chromium/src/+/refs/heads/main/base/nix/xdg_util_unittest.cc """
test_cases = [
({}, _LinuxDesktopEnvironment.OTHER),
+ ({'DESKTOP_SESSION': 'my_custom_de'}, _LinuxDesktopEnvironment.OTHER),
+ ({'XDG_CURRENT_DESKTOP': 'my_custom_de'}, _LinuxDesktopEnvironment.OTHER),
({'DESKTOP_SESSION': 'gnome'}, _LinuxDesktopEnvironment.GNOME),
({'DESKTOP_SESSION': 'mate'}, _LinuxDesktopEnvironment.GNOME),
- ({'DESKTOP_SESSION': 'kde4'}, _LinuxDesktopEnvironment.KDE),
- ({'DESKTOP_SESSION': 'kde'}, _LinuxDesktopEnvironment.KDE),
+ ({'DESKTOP_SESSION': 'kde4'}, _LinuxDesktopEnvironment.KDE4),
+ ({'DESKTOP_SESSION': 'kde'}, _LinuxDesktopEnvironment.KDE3),
({'DESKTOP_SESSION': 'xfce'}, _LinuxDesktopEnvironment.XFCE),
({'GNOME_DESKTOP_SESSION_ID': 1}, _LinuxDesktopEnvironment.GNOME),
- ({'KDE_FULL_SESSION': 1}, _LinuxDesktopEnvironment.KDE),
+ ({'KDE_FULL_SESSION': 1}, _LinuxDesktopEnvironment.KDE3),
+ ({'KDE_FULL_SESSION': 1, 'DESKTOP_SESSION': 'kde4'}, _LinuxDesktopEnvironment.KDE4),
({'XDG_CURRENT_DESKTOP': 'X-Cinnamon'}, _LinuxDesktopEnvironment.CINNAMON),
+ ({'XDG_CURRENT_DESKTOP': 'Deepin'}, _LinuxDesktopEnvironment.DEEPIN),
({'XDG_CURRENT_DESKTOP': 'GNOME'}, _LinuxDesktopEnvironment.GNOME),
({'XDG_CURRENT_DESKTOP': 'GNOME:GNOME-Classic'}, _LinuxDesktopEnvironment.GNOME),
({'XDG_CURRENT_DESKTOP': 'GNOME : GNOME-Classic'}, _LinuxDesktopEnvironment.GNOME),
({'XDG_CURRENT_DESKTOP': 'Unity', 'DESKTOP_SESSION': 'gnome-fallback'}, _LinuxDesktopEnvironment.GNOME),
- ({'XDG_CURRENT_DESKTOP': 'KDE', 'KDE_SESSION_VERSION': '5'}, _LinuxDesktopEnvironment.KDE),
- ({'XDG_CURRENT_DESKTOP': 'KDE'}, _LinuxDesktopEnvironment.KDE),
+ ({'XDG_CURRENT_DESKTOP': 'KDE', 'KDE_SESSION_VERSION': '5'}, _LinuxDesktopEnvironment.KDE5),
+ ({'XDG_CURRENT_DESKTOP': 'KDE', 'KDE_SESSION_VERSION': '6'}, _LinuxDesktopEnvironment.KDE6),
+ ({'XDG_CURRENT_DESKTOP': 'KDE'}, _LinuxDesktopEnvironment.KDE4),
({'XDG_CURRENT_DESKTOP': 'Pantheon'}, _LinuxDesktopEnvironment.PANTHEON),
+ ({'XDG_CURRENT_DESKTOP': 'UKUI'}, _LinuxDesktopEnvironment.UKUI),
({'XDG_CURRENT_DESKTOP': 'Unity'}, _LinuxDesktopEnvironment.UNITY),
({'XDG_CURRENT_DESKTOP': 'Unity:Unity7'}, _LinuxDesktopEnvironment.UNITY),
({'XDG_CURRENT_DESKTOP': 'Unity:Unity8'}, _LinuxDesktopEnvironment.UNITY),
]
for env, expected_desktop_environment in test_cases:
- self.assertEqual(_get_linux_desktop_environment(env), expected_desktop_environment)
+ self.assertEqual(_get_linux_desktop_environment(env, Logger()), expected_desktop_environment)
def test_chrome_cookie_decryptor_linux_derive_key(self):
key = LinuxChromeCookieDecryptor.derive_key(b'abc')
diff --git a/test/test_download.py b/test/test_download.py
index 43b39c36b..6f00a4ded 100755
--- a/test/test_download.py
+++ b/test/test_download.py
@@ -10,10 +10,7 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import collections
import hashlib
-import http.client
import json
-import socket
-import urllib.error
from test.helper import (
assertGreaterEqual,
@@ -29,6 +26,7 @@ from test.helper import (
import yt_dlp.YoutubeDL # isort: split
from yt_dlp.extractor import get_info_extractor
+from yt_dlp.networking.exceptions import HTTPError, TransportError
from yt_dlp.utils import (
DownloadError,
ExtractorError,
@@ -162,8 +160,7 @@ def generator(test_case, tname):
force_generic_extractor=params.get('force_generic_extractor', False))
except (DownloadError, ExtractorError) as err:
# Check if the exception is not a network related one
- if (err.exc_info[0] not in (urllib.error.URLError, socket.timeout, UnavailableVideoError, http.client.BadStatusLine)
- or (err.exc_info[0] == urllib.error.HTTPError and err.exc_info[1].code == 503)):
+ if not isinstance(err.exc_info[1], (TransportError, UnavailableVideoError)) or (isinstance(err.exc_info[1], HTTPError) and err.exc_info[1].status == 503):
err.msg = f'{getattr(err, "msg", err)} ({tname})'
raise
@@ -249,7 +246,7 @@ def generator(test_case, tname):
# extractor returns full results even with extract_flat
res_tcs = [{'info_dict': e} for e in res_dict['entries']]
try_rm_tcs_files(res_tcs)
-
+ ydl.close()
return test_template
diff --git a/test/test_downloader_external.py b/test/test_downloader_external.py
new file mode 100644
index 000000000..62f7d45d4
--- /dev/null
+++ b/test/test_downloader_external.py
@@ -0,0 +1,139 @@
+#!/usr/bin/env python3
+
+# Allow direct execution
+import os
+import sys
+import unittest
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import http.cookiejar
+
+from test.helper import FakeYDL
+from yt_dlp.downloader.external import (
+ Aria2cFD,
+ AxelFD,
+ CurlFD,
+ FFmpegFD,
+ HttpieFD,
+ WgetFD,
+)
+
+TEST_COOKIE = {
+ 'version': 0,
+ 'name': 'test',
+ 'value': 'ytdlp',
+ 'port': None,
+ 'port_specified': False,
+ 'domain': '.example.com',
+ 'domain_specified': True,
+ 'domain_initial_dot': False,
+ 'path': '/',
+ 'path_specified': True,
+ 'secure': False,
+ 'expires': None,
+ 'discard': False,
+ 'comment': None,
+ 'comment_url': None,
+ 'rest': {},
+}
+
+TEST_INFO = {'url': 'http://www.example.com/'}
+
+
+class TestHttpieFD(unittest.TestCase):
+ def test_make_cmd(self):
+ with FakeYDL() as ydl:
+ downloader = HttpieFD(ydl, {})
+ self.assertEqual(
+ downloader._make_cmd('test', TEST_INFO),
+ ['http', '--download', '--output', 'test', 'http://www.example.com/'])
+
+ # Test cookie header is added
+ ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+ self.assertEqual(
+ downloader._make_cmd('test', TEST_INFO),
+ ['http', '--download', '--output', 'test', 'http://www.example.com/', 'Cookie:test=ytdlp'])
+
+
+class TestAxelFD(unittest.TestCase):
+ def test_make_cmd(self):
+ with FakeYDL() as ydl:
+ downloader = AxelFD(ydl, {})
+ self.assertEqual(
+ downloader._make_cmd('test', TEST_INFO),
+ ['axel', '-o', 'test', '--', 'http://www.example.com/'])
+
+ # Test cookie header is added
+ ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+ self.assertEqual(
+ downloader._make_cmd('test', TEST_INFO),
+ ['axel', '-o', 'test', '-H', 'Cookie: test=ytdlp', '--max-redirect=0', '--', 'http://www.example.com/'])
+
+
+class TestWgetFD(unittest.TestCase):
+ def test_make_cmd(self):
+ with FakeYDL() as ydl:
+ downloader = WgetFD(ydl, {})
+ self.assertNotIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
+ # Test cookiejar tempfile arg is added
+ ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+ self.assertIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
+
+
+class TestCurlFD(unittest.TestCase):
+ def test_make_cmd(self):
+ with FakeYDL() as ydl:
+ downloader = CurlFD(ydl, {})
+ self.assertNotIn('--cookie', downloader._make_cmd('test', TEST_INFO))
+ # Test cookie header is added
+ ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+ self.assertIn('--cookie', downloader._make_cmd('test', TEST_INFO))
+ self.assertIn('test=ytdlp', downloader._make_cmd('test', TEST_INFO))
+
+
+class TestAria2cFD(unittest.TestCase):
+ def test_make_cmd(self):
+ with FakeYDL() as ydl:
+ downloader = Aria2cFD(ydl, {})
+ downloader._make_cmd('test', TEST_INFO)
+ self.assertFalse(hasattr(downloader, '_cookies_tempfile'))
+
+ # Test cookiejar tempfile arg is added
+ ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+ cmd = downloader._make_cmd('test', TEST_INFO)
+ self.assertIn(f'--load-cookies={downloader._cookies_tempfile}', cmd)
+
+
+@unittest.skipUnless(FFmpegFD.available(), 'ffmpeg not found')
+class TestFFmpegFD(unittest.TestCase):
+ _args = []
+
+ def _test_cmd(self, args):
+ self._args = args
+
+ def test_make_cmd(self):
+ with FakeYDL() as ydl:
+ downloader = FFmpegFD(ydl, {})
+ downloader._debug_cmd = self._test_cmd
+
+ downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'})
+ self.assertEqual(self._args, [
+ 'ffmpeg', '-y', '-hide_banner', '-i', 'http://www.example.com/',
+ '-c', 'copy', '-f', 'mp4', 'file:test'])
+
+ # Test cookies arg is added
+ ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
+ downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'})
+ self.assertEqual(self._args, [
+ 'ffmpeg', '-y', '-hide_banner', '-cookies', 'test=ytdlp; path=/; domain=.example.com;\r\n',
+ '-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test'])
+
+ # Test with non-url input (ffmpeg reads from stdin '-' for websockets)
+ downloader._call_downloader('test', {'url': 'x', 'ext': 'mp4'})
+ self.assertEqual(self._args, [
+ 'ffmpeg', '-y', '-hide_banner', '-i', 'x', '-c', 'copy', '-f', 'mp4', 'file:test'])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_downloader_http.py b/test/test_downloader_http.py
index 381b2583c..099ec2fff 100644
--- a/test/test_downloader_http.py
+++ b/test/test_downloader_http.py
@@ -16,6 +16,7 @@ from test.helper import http_server_port, try_rm
from yt_dlp import YoutubeDL
from yt_dlp.downloader.http import HttpFD
from yt_dlp.utils import encodeFilename
+from yt_dlp.utils._utils import _YDLLogger as FakeLogger
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
@@ -67,17 +68,6 @@ class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
assert False
-class FakeLogger:
- def debug(self, msg):
- pass
-
- def warning(self, msg):
- pass
-
- def error(self, msg):
- pass
-
-
class TestHttpFD(unittest.TestCase):
def setUp(self):
self.httpd = http.server.HTTPServer(
diff --git a/test/test_http.py b/test/test_http.py
deleted file mode 100644
index 5ca0d7a47..000000000
--- a/test/test_http.py
+++ /dev/null
@@ -1,192 +0,0 @@
-#!/usr/bin/env python3
-
-# Allow direct execution
-import os
-import sys
-import unittest
-
-sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
-
-import http.server
-import ssl
-import threading
-import urllib.request
-
-from test.helper import http_server_port
-from yt_dlp import YoutubeDL
-
-TEST_DIR = os.path.dirname(os.path.abspath(__file__))
-
-
-class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
- def log_message(self, format, *args):
- pass
-
- def do_GET(self):
- if self.path == '/video.html':
- self.send_response(200)
- self.send_header('Content-Type', 'text/html; charset=utf-8')
- self.end_headers()
- self.wfile.write(b'<html><video src="/vid.mp4" /></html>')
- elif self.path == '/vid.mp4':
- self.send_response(200)
- self.send_header('Content-Type', 'video/mp4')
- self.end_headers()
- self.wfile.write(b'\x00\x00\x00\x00\x20\x66\x74[video]')
- elif self.path == '/%E4%B8%AD%E6%96%87.html':
- self.send_response(200)
- self.send_header('Content-Type', 'text/html; charset=utf-8')
- self.end_headers()
- self.wfile.write(b'<html><video src="/vid.mp4" /></html>')
- else:
- assert False
-
-
-class FakeLogger:
- def debug(self, msg):
- pass
-
- def warning(self, msg):
- pass
-
- def error(self, msg):
- pass
-
-
-class TestHTTP(unittest.TestCase):
- def setUp(self):
- self.httpd = http.server.HTTPServer(
- ('127.0.0.1', 0), HTTPTestRequestHandler)
- self.port = http_server_port(self.httpd)
- self.server_thread = threading.Thread(target=self.httpd.serve_forever)
- self.server_thread.daemon = True
- self.server_thread.start()
-
-
-class TestHTTPS(unittest.TestCase):
- def setUp(self):
- certfn = os.path.join(TEST_DIR, 'testcert.pem')
- self.httpd = http.server.HTTPServer(
- ('127.0.0.1', 0), HTTPTestRequestHandler)
- sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- sslctx.load_cert_chain(certfn, None)
- self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True)
- self.port = http_server_port(self.httpd)
- self.server_thread = threading.Thread(target=self.httpd.serve_forever)
- self.server_thread.daemon = True
- self.server_thread.start()
-
- def test_nocheckcertificate(self):
- ydl = YoutubeDL({'logger': FakeLogger()})
- self.assertRaises(
- Exception,
- ydl.extract_info, 'https://127.0.0.1:%d/video.html' % self.port)
-
- ydl = YoutubeDL({'logger': FakeLogger(), 'nocheckcertificate': True})
- r = ydl.extract_info('https://127.0.0.1:%d/video.html' % self.port)
- self.assertEqual(r['url'], 'https://127.0.0.1:%d/vid.mp4' % self.port)
-
-
-class TestClientCert(unittest.TestCase):
- def setUp(self):
- certfn = os.path.join(TEST_DIR, 'testcert.pem')
- self.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
- cacertfn = os.path.join(self.certdir, 'ca.crt')
- self.httpd = http.server.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
- sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- sslctx.verify_mode = ssl.CERT_REQUIRED
- sslctx.load_verify_locations(cafile=cacertfn)
- sslctx.load_cert_chain(certfn, None)
- self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True)
- self.port = http_server_port(self.httpd)
- self.server_thread = threading.Thread(target=self.httpd.serve_forever)
- self.server_thread.daemon = True
- self.server_thread.start()
-
- def _run_test(self, **params):
- ydl = YoutubeDL({
- 'logger': FakeLogger(),
- # Disable client-side validation of unacceptable self-signed testcert.pem
- # The test is of a check on the server side, so unaffected
- 'nocheckcertificate': True,
- **params,
- })
- r = ydl.extract_info('https://127.0.0.1:%d/video.html' % self.port)
- self.assertEqual(r['url'], 'https://127.0.0.1:%d/vid.mp4' % self.port)
-
- def test_certificate_combined_nopass(self):
- self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithkey.crt'))
-
- def test_certificate_nocombined_nopass(self):
- self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
- client_certificate_key=os.path.join(self.certdir, 'client.key'))
-
- def test_certificate_combined_pass(self):
- self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
- client_certificate_password='foobar')
-
- def test_certificate_nocombined_pass(self):
- self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
- client_certificate_key=os.path.join(self.certdir, 'clientencrypted.key'),
- client_certificate_password='foobar')
-
-
-def _build_proxy_handler(name):
- class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
- proxy_name = name
-
- def log_message(self, format, *args):
- pass
-
- def do_GET(self):
- self.send_response(200)
- self.send_header('Content-Type', 'text/plain; charset=utf-8')
- self.end_headers()
- self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
- return HTTPTestRequestHandler
-
-
-class TestProxy(unittest.TestCase):
- def setUp(self):
- self.proxy = http.server.HTTPServer(
- ('127.0.0.1', 0), _build_proxy_handler('normal'))
- self.port = http_server_port(self.proxy)
- self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)
- self.proxy_thread.daemon = True
- self.proxy_thread.start()
-
- self.geo_proxy = http.server.HTTPServer(
- ('127.0.0.1', 0), _build_proxy_handler('geo'))
- self.geo_port = http_server_port(self.geo_proxy)
- self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever)
- self.geo_proxy_thread.daemon = True
- self.geo_proxy_thread.start()
-
- def test_proxy(self):
- geo_proxy = f'127.0.0.1:{self.geo_port}'
- ydl = YoutubeDL({
- 'proxy': f'127.0.0.1:{self.port}',
- 'geo_verification_proxy': geo_proxy,
- })
- url = 'http://foo.com/bar'
- response = ydl.urlopen(url).read().decode()
- self.assertEqual(response, f'normal: {url}')
-
- req = urllib.request.Request(url)
- req.add_header('Ytdl-request-proxy', geo_proxy)
- response = ydl.urlopen(req).read().decode()
- self.assertEqual(response, f'geo: {url}')
-
- def test_proxy_with_idn(self):
- ydl = YoutubeDL({
- 'proxy': f'127.0.0.1:{self.port}',
- })
- url = 'http://中文.tw/'
- response = ydl.urlopen(url).read().decode()
- # b'xn--fiq228c' is '中文'.encode('idna')
- self.assertEqual(response, 'normal: http://xn--fiq228c.tw/')
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/test/test_jsinterp.py b/test/test_jsinterp.py
index 3c4391c4a..86928a6a0 100644
--- a/test/test_jsinterp.py
+++ b/test/test_jsinterp.py
@@ -8,410 +8,372 @@ import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import math
-import re
from yt_dlp.jsinterp import JS_Undefined, JSInterpreter
-class TestJSInterpreter(unittest.TestCase):
- def test_basic(self):
- jsi = JSInterpreter('function x(){;}')
- self.assertEqual(jsi.call_function('x'), None)
+class NaN:
+ pass
- jsi = JSInterpreter('function x3(){return 42;}')
- self.assertEqual(jsi.call_function('x3'), 42)
- jsi = JSInterpreter('function x3(){42}')
- self.assertEqual(jsi.call_function('x3'), None)
+class TestJSInterpreter(unittest.TestCase):
+ def _test(self, jsi_or_code, expected, func='f', args=()):
+ if isinstance(jsi_or_code, str):
+ jsi_or_code = JSInterpreter(jsi_or_code)
+ got = jsi_or_code.call_function(func, *args)
+ if expected is NaN:
+ self.assertTrue(math.isnan(got), f'{got} is not NaN')
+ else:
+ self.assertEqual(got, expected)
- jsi = JSInterpreter('var x5 = function(){return 42;}')
- self.assertEqual(jsi.call_function('x5'), 42)
+ def test_basic(self):
+ jsi = JSInterpreter('function f(){;}')
+ self.assertEqual(repr(jsi.extract_function('f')), 'F<f>')
+ self._test(jsi, None)
+
+ self._test('function f(){return 42;}', 42)
+ self._test('function f(){42}', None)
+ self._test('var f = function(){return 42;}', 42)
+
+ def test_add(self):
+ self._test('function f(){return 42 + 7;}', 49)
+ self._test('function f(){return 42 + undefined;}', NaN)
+ self._test('function f(){return 42 + null;}', 42)
+
+ def test_sub(self):
+ self._test('function f(){return 42 - 7;}', 35)
+ self._test('function f(){return 42 - undefined;}', NaN)
+ self._test('function f(){return 42 - null;}', 42)
+
+ def test_mul(self):
+ self._test('function f(){return 42 * 7;}', 294)
+ self._test('function f(){return 42 * undefined;}', NaN)
+ self._test('function f(){return 42 * null;}', 0)
+
+ def test_div(self):
+ jsi = JSInterpreter('function f(a, b){return a / b;}')
+ self._test(jsi, NaN, args=(0, 0))
+ self._test(jsi, NaN, args=(JS_Undefined, 1))
+ self._test(jsi, float('inf'), args=(2, 0))
+ self._test(jsi, 0, args=(0, 3))
+
+ def test_mod(self):
+ self._test('function f(){return 42 % 7;}', 0)
+ self._test('function f(){return 42 % 0;}', NaN)
+ self._test('function f(){return 42 % undefined;}', NaN)
+
+ def test_exp(self):
+ self._test('function f(){return 42 ** 2;}', 1764)
+ self._test('function f(){return 42 ** undefined;}', NaN)
+ self._test('function f(){return 42 ** null;}', 1)
+ self._test('function f(){return undefined ** 42;}', NaN)
def test_calc(self):
- jsi = JSInterpreter('function x4(a){return 2*a+1;}')
- self.assertEqual(jsi.call_function('x4', 3), 7)
+ self._test('function f(a){return 2*a+1;}', 7, args=[3])
def test_empty_return(self):
- jsi = JSInterpreter('function f(){return; y()}')
- self.assertEqual(jsi.call_function('f'), None)
+ self._test('function f(){return; y()}', None)
def test_morespace(self):
- jsi = JSInterpreter('function x (a) { return 2 * a + 1 ; }')
- self.assertEqual(jsi.call_function('x', 3), 7)
-
- jsi = JSInterpreter('function f () { x = 2 ; return x; }')
- self.assertEqual(jsi.call_function('f'), 2)
+ self._test('function f (a) { return 2 * a + 1 ; }', 7, args=[3])
+ self._test('function f () { x = 2 ; return x; }', 2)
def test_strange_chars(self):
- jsi = JSInterpreter('function $_xY1 ($_axY1) { var $_axY2 = $_axY1 + 1; return $_axY2; }')
- self.assertEqual(jsi.call_function('$_xY1', 20), 21)
+ self._test('function $_xY1 ($_axY1) { var $_axY2 = $_axY1 + 1; return $_axY2; }',
+ 21, args=[20], func='$_xY1')
def test_operators(self):
- jsi = JSInterpreter('function f(){return 1 << 5;}')
- self.assertEqual(jsi.call_function('f'), 32)
-
- jsi = JSInterpreter('function f(){return 2 ** 5}')
- self.assertEqual(jsi.call_function('f'), 32)
-
- jsi = JSInterpreter('function f(){return 19 & 21;}')
- self.assertEqual(jsi.call_function('f'), 17)
-
- jsi = JSInterpreter('function f(){return 11 >> 2;}')
- self.assertEqual(jsi.call_function('f'), 2)
-
- jsi = JSInterpreter('function f(){return []? 2+3: 4;}')
- self.assertEqual(jsi.call_function('f'), 5)
-
- jsi = JSInterpreter('function f(){return 1 == 2}')
- self.assertEqual(jsi.call_function('f'), False)
-
- jsi = JSInterpreter('function f(){return 0 && 1 || 2;}')
- self.assertEqual(jsi.call_function('f'), 2)
-
- jsi = JSInterpreter('function f(){return 0 ?? 42;}')
- self.assertEqual(jsi.call_function('f'), 0)
-
- jsi = JSInterpreter('function f(){return "life, the universe and everything" < 42;}')
- self.assertFalse(jsi.call_function('f'))
+ self._test('function f(){return 1 << 5;}', 32)
+ self._test('function f(){return 2 ** 5}', 32)
+ self._test('function f(){return 19 & 21;}', 17)
+ self._test('function f(){return 11 >> 2;}', 2)
+ self._test('function f(){return []? 2+3: 4;}', 5)
+ self._test('function f(){return 1 == 2}', False)
+ self._test('function f(){return 0 && 1 || 2;}', 2)
+ self._test('function f(){return 0 ?? 42;}', 0)
+ self._test('function f(){return "life, the universe and everything" < 42;}', False)
def test_array_access(self):
- jsi = JSInterpreter('function f(){var x = [1,2,3]; x[0] = 4; x[0] = 5; x[2.0] = 7; return x;}')
- self.assertEqual(jsi.call_function('f'), [5, 2, 7])
+ self._test('function f(){var x = [1,2,3]; x[0] = 4; x[0] = 5; x[2.0] = 7; return x;}', [5, 2, 7])
def test_parens(self):
- jsi = JSInterpreter('function f(){return (1) + (2) * ((( (( (((((3)))))) )) ));}')
- self.assertEqual(jsi.call_function('f'), 7)
-
- jsi = JSInterpreter('function f(){return (1 + 2) * 3;}')
- self.assertEqual(jsi.call_function('f'), 9)
+ self._test('function f(){return (1) + (2) * ((( (( (((((3)))))) )) ));}', 7)
+ self._test('function f(){return (1 + 2) * 3;}', 9)
def test_quotes(self):
- jsi = JSInterpreter(R'function f(){return "a\"\\("}')
- self.assertEqual(jsi.call_function('f'), R'a"\(')
+ self._test(R'function f(){return "a\"\\("}', R'a"\(')
def test_assignments(self):
- jsi = JSInterpreter('function f(){var x = 20; x = 30 + 1; return x;}')
- self.assertEqual(jsi.call_function('f'), 31)
-
- jsi = JSInterpreter('function f(){var x = 20; x += 30 + 1; return x;}')
- self.assertEqual(jsi.call_function('f'), 51)
-
- jsi = JSInterpreter('function f(){var x = 20; x -= 30 + 1; return x;}')
- self.assertEqual(jsi.call_function('f'), -11)
+ self._test('function f(){var x = 20; x = 30 + 1; return x;}', 31)
+ self._test('function f(){var x = 20; x += 30 + 1; return x;}', 51)
+ self._test('function f(){var x = 20; x -= 30 + 1; return x;}', -11)
+ @unittest.skip('Not implemented')
def test_comments(self):
- 'Skipping: Not yet fully implemented'
- return
- jsi = JSInterpreter('''
- function x() {
- var x = /* 1 + */ 2;
- var y = /* 30
- * 40 */ 50;
- return x + y;
- }
- ''')
- self.assertEqual(jsi.call_function('x'), 52)
-
- jsi = JSInterpreter('''
- function f() {
- var x = "/*";
- var y = 1 /* comment */ + 2;
- return y;
- }
- ''')
- self.assertEqual(jsi.call_function('f'), 3)
+ self._test('''
+ function f() {
+ var x = /* 1 + */ 2;
+ var y = /* 30
+ * 40 */ 50;
+ return x + y;
+ }
+ ''', 52)
+
+ self._test('''
+ function f() {
+ var x = "/*";
+ var y = 1 /* comment */ + 2;
+ return y;
+ }
+ ''', 3)
def test_precedence(self):
- jsi = JSInterpreter('''
- function x() {
- var a = [10, 20, 30, 40, 50];
- var b = 6;
- a[0]=a[b%a.length];
- return a;
- }''')
- self.assertEqual(jsi.call_function('x'), [20, 20, 30, 40, 50])
+ self._test('''
+ function f() {
+ var a = [10, 20, 30, 40, 50];
+ var b = 6;
+ a[0]=a[b%a.length];
+ return a;
+ }
+ ''', [20, 20, 30, 40, 50])
def test_builtins(self):
- jsi = JSInterpreter('''
- function x() { return NaN }
- ''')
- self.assertTrue(math.isnan(jsi.call_function('x')))
+ self._test('function f() { return NaN }', NaN)
- jsi = JSInterpreter('''
- function x() { return new Date('Wednesday 31 December 1969 18:01:26 MDT') - 0; }
- ''')
- self.assertEqual(jsi.call_function('x'), 86000)
- jsi = JSInterpreter('''
- function x(dt) { return new Date(dt) - 0; }
- ''')
- self.assertEqual(jsi.call_function('x', 'Wednesday 31 December 1969 18:01:26 MDT'), 86000)
+ def test_date(self):
+ self._test('function f() { return new Date("Wednesday 31 December 1969 18:01:26 MDT") - 0; }', 86000)
+
+ jsi = JSInterpreter('function f(dt) { return new Date(dt) - 0; }')
+ self._test(jsi, 86000, args=['Wednesday 31 December 1969 18:01:26 MDT'])
+ self._test(jsi, 86000, args=['12/31/1969 18:01:26 MDT']) # m/d/y
+ self._test(jsi, 0, args=['1 January 1970 00:00:00 UTC'])
def test_call(self):
jsi = JSInterpreter('''
- function x() { return 2; }
- function y(a) { return x() + (a?a:0); }
- function z() { return y(3); }
- ''')
- self.assertEqual(jsi.call_function('z'), 5)
- self.assertEqual(jsi.call_function('y'), 2)
+ function x() { return 2; }
+ function y(a) { return x() + (a?a:0); }
+ function z() { return y(3); }
+ ''')
+ self._test(jsi, 5, func='z')
+ self._test(jsi, 2, func='y')
+
+ def test_if(self):
+ self._test('''
+ function f() {
+ let a = 9;
+ if (0==0) {a++}
+ return a
+ }
+ ''', 10)
+
+ self._test('''
+ function f() {
+ if (0==0) {return 10}
+ }
+ ''', 10)
+
+ self._test('''
+ function f() {
+ if (0!=0) {return 1}
+ else {return 10}
+ }
+ ''', 10)
+
+ """ # Unsupported
+ self._test('''
+ function f() {
+ if (0!=0) {return 1}
+ else if (1==0) {return 2}
+ else {return 10}
+ }
+ ''', 10)
+ """
def test_for_loop(self):
- jsi = JSInterpreter('''
- function x() { a=0; for (i=0; i-10; i++) {a++} return a }
- ''')
- self.assertEqual(jsi.call_function('x'), 10)
+ self._test('function f() { a=0; for (i=0; i-10; i++) {a++} return a }', 10)
def test_switch(self):
jsi = JSInterpreter('''
- function x(f) { switch(f){
- case 1:f+=1;
- case 2:f+=2;
- case 3:f+=3;break;
- case 4:f+=4;
- default:f=0;
- } return f }
+ function f(x) { switch(x){
+ case 1:x+=1;
+ case 2:x+=2;
+ case 3:x+=3;break;
+ case 4:x+=4;
+ default:x=0;
+ } return x }
''')
- self.assertEqual(jsi.call_function('x', 1), 7)
- self.assertEqual(jsi.call_function('x', 3), 6)
- self.assertEqual(jsi.call_function('x', 5), 0)
+ self._test(jsi, 7, args=[1])
+ self._test(jsi, 6, args=[3])
+ self._test(jsi, 0, args=[5])
def test_switch_default(self):
jsi = JSInterpreter('''
- function x(f) { switch(f){
- case 2: f+=2;
- default: f-=1;
- case 5:
- case 6: f+=6;
- case 0: break;
- case 1: f+=1;
- } return f }
+ function f(x) { switch(x){
+ case 2: x+=2;
+ default: x-=1;
+ case 5:
+ case 6: x+=6;
+ case 0: break;
+ case 1: x+=1;
+ } return x }
''')
- self.assertEqual(jsi.call_function('x', 1), 2)
- self.assertEqual(jsi.call_function('x', 5), 11)
- self.assertEqual(jsi.call_function('x', 9), 14)
+ self._test(jsi, 2, args=[1])
+ self._test(jsi, 11, args=[5])
+ self._test(jsi, 14, args=[9])
def test_try(self):
- jsi = JSInterpreter('''
- function x() { try{return 10} catch(e){return 5} }
- ''')
- self.assertEqual(jsi.call_function('x'), 10)
+ self._test('function f() { try{return 10} catch(e){return 5} }', 10)
def test_catch(self):
- jsi = JSInterpreter('''
- function x() { try{throw 10} catch(e){return 5} }
- ''')
- self.assertEqual(jsi.call_function('x'), 5)
+ self._test('function f() { try{throw 10} catch(e){return 5} }', 5)
def test_finally(self):
- jsi = JSInterpreter('''
- function x() { try{throw 10} finally {return 42} }
- ''')
- self.assertEqual(jsi.call_function('x'), 42)
- jsi = JSInterpreter('''
- function x() { try{throw 10} catch(e){return 5} finally {return 42} }
- ''')
- self.assertEqual(jsi.call_function('x'), 42)
+ self._test('function f() { try{throw 10} finally {return 42} }', 42)
+ self._test('function f() { try{throw 10} catch(e){return 5} finally {return 42} }', 42)
def test_nested_try(self):
- jsi = JSInterpreter('''
- function x() {try {
- try{throw 10} finally {throw 42}
- } catch(e){return 5} }
- ''')
- self.assertEqual(jsi.call_function('x'), 5)
+ self._test('''
+ function f() {try {
+ try{throw 10} finally {throw 42}
+ } catch(e){return 5} }
+ ''', 5)
def test_for_loop_continue(self):
- jsi = JSInterpreter('''
- function x() { a=0; for (i=0; i-10; i++) { continue; a++ } return a }
- ''')
- self.assertEqual(jsi.call_function('x'), 0)
+ self._test('function f() { a=0; for (i=0; i-10; i++) { continue; a++ } return a }', 0)
def test_for_loop_break(self):
- jsi = JSInterpreter('''
- function x() { a=0; for (i=0; i-10; i++) { break; a++ } return a }
- ''')
- self.assertEqual(jsi.call_function('x'), 0)
+ self._test('function f() { a=0; for (i=0; i-10; i++) { break; a++ } return a }', 0)
def test_for_loop_try(self):
- jsi = JSInterpreter('''
- function x() {
- for (i=0; i-10; i++) { try { if (i == 5) throw i} catch {return 10} finally {break} };
- return 42 }
- ''')
- self.assertEqual(jsi.call_function('x'), 42)
+ self._test('''
+ function f() {
+ for (i=0; i-10; i++) { try { if (i == 5) throw i} catch {return 10} finally {break} };
+ return 42 }
+ ''', 42)
def test_literal_list(self):
- jsi = JSInterpreter('''
- function x() { return [1, 2, "asdf", [5, 6, 7]][3] }
- ''')
- self.assertEqual(jsi.call_function('x'), [5, 6, 7])
+ self._test('function f() { return [1, 2, "asdf", [5, 6, 7]][3] }', [5, 6, 7])
def test_comma(self):
- jsi = JSInterpreter('''
- function x() { a=5; a -= 1, a+=3; return a }
- ''')
- self.assertEqual(jsi.call_function('x'), 7)
-
- jsi = JSInterpreter('''
- function x() { a=5; return (a -= 1, a+=3, a); }
- ''')
- self.assertEqual(jsi.call_function('x'), 7)
-
- jsi = JSInterpreter('''
- function x() { return (l=[0,1,2,3], function(a, b){return a+b})((l[1], l[2]), l[3]) }
- ''')
- self.assertEqual(jsi.call_function('x'), 5)
+ self._test('function f() { a=5; a -= 1, a+=3; return a }', 7)
+ self._test('function f() { a=5; return (a -= 1, a+=3, a); }', 7)
+ self._test('function f() { return (l=[0,1,2,3], function(a, b){return a+b})((l[1], l[2]), l[3]) }', 5)
def test_void(self):
- jsi = JSInterpreter('''
- function x() { return void 42; }
- ''')
- self.assertEqual(jsi.call_function('x'), None)
+ self._test('function f() { return void 42; }', None)
def test_return_function(self):
jsi = JSInterpreter('''
- function x() { return [1, function(){return 1}][1] }
+ function f() { return [1, function(){return 1}][1] }
''')
- self.assertEqual(jsi.call_function('x')([]), 1)
+ self.assertEqual(jsi.call_function('f')([]), 1)
def test_null(self):
- jsi = JSInterpreter('''
- function x() { return null; }
- ''')
- self.assertEqual(jsi.call_function('x'), None)
-
- jsi = JSInterpreter('''
- function x() { return [null > 0, null < 0, null == 0, null === 0]; }
- ''')
- self.assertEqual(jsi.call_function('x'), [False, False, False, False])
-
- jsi = JSInterpreter('''
- function x() { return [null >= 0, null <= 0]; }
- ''')
- self.assertEqual(jsi.call_function('x'), [True, True])
+ self._test('function f() { return null; }', None)
+ self._test('function f() { return [null > 0, null < 0, null == 0, null === 0]; }',
+ [False, False, False, False])
+ self._test('function f() { return [null >= 0, null <= 0]; }', [True, True])
def test_undefined(self):
- jsi = JSInterpreter('''
- function x() { return undefined === undefined; }
- ''')
- self.assertEqual(jsi.call_function('x'), True)
-
- jsi = JSInterpreter('''
- function x() { return undefined; }
- ''')
- self.assertEqual(jsi.call_function('x'), JS_Undefined)
-
- jsi = JSInterpreter('''
- function x() { let v; return v; }
- ''')
- self.assertEqual(jsi.call_function('x'), JS_Undefined)
-
- jsi = JSInterpreter('''
- function x() { return [undefined === undefined, undefined == undefined, undefined < undefined, undefined > undefined]; }
- ''')
- self.assertEqual(jsi.call_function('x'), [True, True, False, False])
-
- jsi = JSInterpreter('''
- function x() { return [undefined === 0, undefined == 0, undefined < 0, undefined > 0]; }
- ''')
- self.assertEqual(jsi.call_function('x'), [False, False, False, False])
-
- jsi = JSInterpreter('''
- function x() { return [undefined >= 0, undefined <= 0]; }
- ''')
- self.assertEqual(jsi.call_function('x'), [False, False])
-
- jsi = JSInterpreter('''
- function x() { return [undefined > null, undefined < null, undefined == null, undefined === null]; }
- ''')
- self.assertEqual(jsi.call_function('x'), [False, False, True, False])
-
- jsi = JSInterpreter('''
- function x() { return [undefined === null, undefined == null, undefined < null, undefined > null]; }
- ''')
- self.assertEqual(jsi.call_function('x'), [False, True, False, False])
-
- jsi = JSInterpreter('''
- function x() { let v; return [42+v, v+42, v**42, 42**v, 0**v]; }
- ''')
- for y in jsi.call_function('x'):
+ self._test('function f() { return undefined === undefined; }', True)
+ self._test('function f() { return undefined; }', JS_Undefined)
+ self._test('function f() {return undefined ?? 42; }', 42)
+ self._test('function f() { let v; return v; }', JS_Undefined)
+ self._test('function f() { let v; return v**0; }', 1)
+ self._test('function f() { let v; return [v>42, v<=42, v&&42, 42&&v]; }',
+ [False, False, JS_Undefined, JS_Undefined])
+
+ self._test('''
+ function f() { return [
+ undefined === undefined,
+ undefined == undefined,
+ undefined == null,
+ undefined < undefined,
+ undefined > undefined,
+ undefined === 0,
+ undefined == 0,
+ undefined < 0,
+ undefined > 0,
+ undefined >= 0,
+ undefined <= 0,
+ undefined > null,
+ undefined < null,
+ undefined === null
+ ]; }
+ ''', list(map(bool, (1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))))
+
+ jsi = JSInterpreter('''
+ function f() { let v; return [42+v, v+42, v**42, 42**v, 0**v]; }
+ ''')
+ for y in jsi.call_function('f'):
self.assertTrue(math.isnan(y))
- jsi = JSInterpreter('''
- function x() { let v; return v**0; }
- ''')
- self.assertEqual(jsi.call_function('x'), 1)
-
- jsi = JSInterpreter('''
- function x() { let v; return [v>42, v<=42, v&&42, 42&&v]; }
- ''')
- self.assertEqual(jsi.call_function('x'), [False, False, JS_Undefined, JS_Undefined])
-
- jsi = JSInterpreter('function x(){return undefined ?? 42; }')
- self.assertEqual(jsi.call_function('x'), 42)
-
def test_object(self):
- jsi = JSInterpreter('''
- function x() { return {}; }
- ''')
- self.assertEqual(jsi.call_function('x'), {})
-
- jsi = JSInterpreter('''
- function x() { let a = {m1: 42, m2: 0 }; return [a["m1"], a.m2]; }
- ''')
- self.assertEqual(jsi.call_function('x'), [42, 0])
-
- jsi = JSInterpreter('''
- function x() { let a; return a?.qq; }
- ''')
- self.assertEqual(jsi.call_function('x'), JS_Undefined)
-
- jsi = JSInterpreter('''
- function x() { let a = {m1: 42, m2: 0 }; return a?.qq; }
- ''')
- self.assertEqual(jsi.call_function('x'), JS_Undefined)
+ self._test('function f() { return {}; }', {})
+ self._test('function f() { let a = {m1: 42, m2: 0 }; return [a["m1"], a.m2]; }', [42, 0])
+ self._test('function f() { let a; return a?.qq; }', JS_Undefined)
+ self._test('function f() { let a = {m1: 42, m2: 0 }; return a?.qq; }', JS_Undefined)
def test_regex(self):
- jsi = JSInterpreter('''
- function x() { let a=/,,[/,913,/](,)}/; }
- ''')
- self.assertEqual(jsi.call_function('x'), None)
-
- jsi = JSInterpreter('''
- function x() { let a=/,,[/,913,/](,)}/; return a; }
- ''')
- self.assertIsInstance(jsi.call_function('x'), re.Pattern)
-
- jsi = JSInterpreter('''
- function x() { let a=/,,[/,913,/](,)}/i; return a; }
- ''')
- self.assertEqual(jsi.call_function('x').flags & re.I, re.I)
-
- jsi = JSInterpreter(R'''
- function x() { let a=/,][}",],()}(\[)/; return a; }
- ''')
- self.assertEqual(jsi.call_function('x').pattern, r',][}",],()}(\[)')
-
- jsi = JSInterpreter(R'''
- function x() { let a=[/[)\\]/]; return a[0]; }
- ''')
- self.assertEqual(jsi.call_function('x').pattern, r'[)\\]')
+ self._test('function f() { let a=/,,[/,913,/](,)}/; }', None)
+ self._test('function f() { let a=/,,[/,913,/](,)}/; return a; }', R'/,,[/,913,/](,)}/0')
+
+ R''' # We are not compiling regex
+ jsi = JSInterpreter('function f() { let a=/,,[/,913,/](,)}/; return a; }')
+ self.assertIsInstance(jsi.call_function('f'), re.Pattern)
+
+ jsi = JSInterpreter('function f() { let a=/,,[/,913,/](,)}/i; return a; }')
+ self.assertEqual(jsi.call_function('f').flags & re.I, re.I)
+
+ jsi = JSInterpreter(R'function f() { let a=/,][}",],()}(\[)/; return a; }')
+ self.assertEqual(jsi.call_function('f').pattern, r',][}",],()}(\[)')
+
+ jsi = JSInterpreter(R'function f() { let a=[/[)\\]/]; return a[0]; }')
+ self.assertEqual(jsi.call_function('f').pattern, r'[)\\]')
+ '''
+
+ @unittest.skip('Not implemented')
+ def test_replace(self):
+ self._test('function f() { let a="data-name".replace("data-", ""); return a }',
+ 'name')
+ self._test('function f() { let a="data-name".replace(new RegExp("^.+-"), ""); return a; }',
+ 'name')
+ self._test('function f() { let a="data-name".replace(/^.+-/, ""); return a; }',
+ 'name')
+ self._test('function f() { let a="data-name".replace(/a/g, "o"); return a; }',
+ 'doto-nome')
+ self._test('function f() { let a="data-name".replaceAll("a", "o"); return a; }',
+ 'doto-nome')
def test_char_code_at(self):
- jsi = JSInterpreter('function x(i){return "test".charCodeAt(i)}')
- self.assertEqual(jsi.call_function('x', 0), 116)
- self.assertEqual(jsi.call_function('x', 1), 101)
- self.assertEqual(jsi.call_function('x', 2), 115)
- self.assertEqual(jsi.call_function('x', 3), 116)
- self.assertEqual(jsi.call_function('x', 4), None)
- self.assertEqual(jsi.call_function('x', 'not_a_number'), 116)
+ jsi = JSInterpreter('function f(i){return "test".charCodeAt(i)}')
+ self._test(jsi, 116, args=[0])
+ self._test(jsi, 101, args=[1])
+ self._test(jsi, 115, args=[2])
+ self._test(jsi, 116, args=[3])
+ self._test(jsi, None, args=[4])
+ self._test(jsi, 116, args=['not_a_number'])
def test_bitwise_operators_overflow(self):
- jsi = JSInterpreter('function x(){return -524999584 << 5}')
- self.assertEqual(jsi.call_function('x'), 379882496)
-
- jsi = JSInterpreter('function x(){return 1236566549 << 5}')
- self.assertEqual(jsi.call_function('x'), 915423904)
+ self._test('function f(){return -524999584 << 5}', 379882496)
+ self._test('function f(){return 1236566549 << 5}', 915423904)
+
+ def test_bitwise_operators_typecast(self):
+ self._test('function f(){return null << 5}', 0)
+ self._test('function f(){return undefined >> 5}', 0)
+ self._test('function f(){return 42 << NaN}', 42)
+
+ def test_negative(self):
+ self._test('function f(){return 2 * -2.0 ;}', -4)
+ self._test('function f(){return 2 - - -2 ;}', 0)
+ self._test('function f(){return 2 - - - -2 ;}', 4)
+ self._test('function f(){return 2 - + + - -2;}', 0)
+ self._test('function f(){return 2 + - + - -2;}', 0)
+
+ @unittest.skip('Not implemented')
+ def test_packed(self):
+ jsi = JSInterpreter('''function f(p,a,c,k,e,d){while(c--)if(k[c])p=p.replace(new RegExp('\\b'+c.toString(a)+'\\b','g'),k[c]);return p}''')
+ self.assertEqual(jsi.call_function('f', '''h 7=g("1j");7.7h({7g:[{33:"w://7f-7e-7d-7c.v.7b/7a/79/78/77/76.74?t=73&s=2s&e=72&f=2t&71=70.0.0.1&6z=6y&6x=6w"}],6v:"w://32.v.u/6u.31",16:"r%",15:"r%",6t:"6s",6r:"",6q:"l",6p:"l",6o:"6n",6m:\'6l\',6k:"6j",9:[{33:"/2u?b=6i&n=50&6h=w://32.v.u/6g.31",6f:"6e"}],1y:{6d:1,6c:\'#6b\',6a:\'#69\',68:"67",66:30,65:r,},"64":{63:"%62 2m%m%61%5z%5y%5x.u%5w%5v%5u.2y%22 2k%m%1o%22 5t%m%1o%22 5s%m%1o%22 2j%m%5r%22 16%m%5q%22 15%m%5p%22 5o%2z%5n%5m%2z",5l:"w://v.u/d/1k/5k.2y",5j:[]},\'5i\':{"5h":"5g"},5f:"5e",5d:"w://v.u",5c:{},5b:l,1x:[0.25,0.50,0.75,1,1.25,1.5,2]});h 1m,1n,5a;h 59=0,58=0;h 7=g("1j");h 2x=0,57=0,56=0;$.55({54:{\'53-52\':\'2i-51\'}});7.j(\'4z\',6(x){c(5>0&&x.1l>=5&&1n!=1){1n=1;$(\'q.4y\').4x(\'4w\')}});7.j(\'13\',6(x){2x=x.1l});7.j(\'2g\',6(x){2w(x)});7.j(\'4v\',6(){$(\'q.2v\').4u()});6 2w(x){$(\'q.2v\').4t();c(1m)19;1m=1;17=0;c(4s.4r===l){17=1}$.4q(\'/2u?b=4p&2l=1k&4o=2t-4n-4m-2s-4l&4k=&4j=&4i=&17=\'+17,6(2r){$(\'#4h\').4g(2r)});$(\'.3-8-4f-4e:4d("4c")\').2h(6(e){2q();g().4b(0);g().4a(l)});6 2q(){h $14=$("<q />").2p({1l:"49",16:"r%",15:"r%",48:0,2n:0,2o:47,46:"45(10%, 10%, 10%, 0.4)","44-43":"42"});$("<41 />").2p({16:"60%",15:"60%",2o:40,"3z-2n":"3y"}).3x({\'2m\':\'/?b=3w&2l=1k\',\'2k\':\'0\',\'2j\':\'2i\'}).2f($14);$14.2h(6(){$(3v).3u();g().2g()});$14.2f($(\'#1j\'))}g().13(0);}6 3t(){h 9=7.1b(2e);2d.2c(9);c(9.n>1){1r(i=0;i<9.n;i++){c(9[i].1a==2e){2d.2c(\'!!=\'+i);7.1p(i)}}}}7.j(\'3s\',6(){g().1h("/2a/3r.29","3q 10 28",6(){g().13(g().27()+10)},"2b");$("q[26=2b]").23().21(\'.3-20-1z\');g().1h("/2a/3p.29","3o 10 28",6(){h 12=g().27()-10;c(12<0)12=0;g().13(12)},"24");$("q[26=24]").23().21(\'.3-20-1z\');});6 1i(){}7.j(\'3n\',6(){1i()});7.j(\'3m\',6(){1i()});7.j("k",6(y){h 9=7.1b();c(9.n<2)19;$(\'.3-8-3l-3k\').3j(6(){$(\'#3-8-a-k\').1e(\'3-8-a-z\');$(\'.3-a-k\').p(\'o-1f\',\'11\')});7.1h("/3i/3h.3g","3f 3e",6(){$(\'.3-1w\').3d(\'3-8-1v\');$(\'.3-8-1y, .3-8-1x\').p(\'o-1g\',\'11\');c($(\'.3-1w\').3c(\'3-8-1v\')){$(\'.3-a-k\').p(\'o-1g\',\'l\');$(\'.3-a-k\').p(\'o-1f\',\'l\');$(\'.3-8-a\').1e(\'3-8-a-z\');$(\'.3-8-a:1u\').3b(\'3-8-a-z\')}3a{$(\'.3-a-k\').p(\'o-1g\',\'11\');$(\'.3-a-k\').p(\'o-1f\',\'11\');$(\'.3-8-a:1u\').1e(\'3-8-a-z\')}},"39");7.j("38",6(y){1d.37(\'1c\',y.9[y.36].1a)});c(1d.1t(\'1c\')){35("1s(1d.1t(\'1c\'));",34)}});h 18;6 1s(1q){h 9=7.1b();c(9.n>1){1r(i=0;i<9.n;i++){c(9[i].1a==1q){c(i==18){19}18=i;7.1p(i)}}}}',36,270,'|||jw|||function|player|settings|tracks|submenu||if||||jwplayer|var||on|audioTracks|true|3D|length|aria|attr|div|100|||sx|filemoon|https||event|active||false|tt|seek|dd|height|width|adb|current_audio|return|name|getAudioTracks|default_audio|localStorage|removeClass|expanded|checked|addButton|callMeMaybe|vplayer|0fxcyc2ajhp1|position|vvplay|vvad|220|setCurrentAudioTrack|audio_name|for|audio_set|getItem|last|open|controls|playbackRates|captions|rewind|icon|insertAfter||detach|ff00||button|getPosition|sec|png|player8|ff11|log|console|track_name|appendTo|play|click|no|scrolling|frameborder|file_code|src|top|zIndex|css|showCCform|data|1662367683|383371|dl|video_ad|doPlay|prevt|mp4|3E||jpg|thumbs|file|300|setTimeout|currentTrack|setItem|audioTrackChanged|dualSound|else|addClass|hasClass|toggleClass|Track|Audio|svg|dualy|images|mousedown|buttons|topbar|playAttemptFailed|beforePlay|Rewind|fr|Forward|ff|ready|set_audio_track|remove|this|upload_srt|prop|50px|margin|1000001|iframe|center|align|text|rgba|background|1000000|left|absolute|pause|setCurrentCaptions|Upload|contains|item|content|html|fviews|referer|prem|embed|3e57249ef633e0d03bf76ceb8d8a4b65|216|83|hash|view|get|TokenZir|window|hide|show|complete|slow|fadeIn|video_ad_fadein|time||cache|Cache|Content|headers|ajaxSetup|v2done|tott|vastdone2|vastdone1|vvbefore|playbackRateControls|cast|aboutlink|FileMoon|abouttext|UHD|1870|qualityLabels|sites|GNOME_POWER|link|2Fiframe|3C|allowfullscreen|22360|22640|22no|marginheight|marginwidth|2FGNOME_POWER|2F0fxcyc2ajhp1|2Fe|2Ffilemoon|2F|3A||22https|3Ciframe|code|sharing|fontOpacity|backgroundOpacity|Tahoma|fontFamily|303030|backgroundColor|FFFFFF|color|userFontScale|thumbnails|kind|0fxcyc2ajhp10000|url|get_slides|start|startparam|none|preload|html5|primary|hlshtml|androidhls|duration|uniform|stretching|0fxcyc2ajhp1_xt|image|2048|sp|6871|asn|127|srv|43200|_g3XlBcu2lmD9oDexD2NLWSmah2Nu3XcDrl93m9PwXY|m3u8||master|0fxcyc2ajhp1_x|00076|01|hls2|to|s01|delivery|storage|moon|sources|setup'''.split('|')))
if __name__ == '__main__':
diff --git a/test/test_networking.py b/test/test_networking.py
new file mode 100644
index 000000000..5308c8d6f
--- /dev/null
+++ b/test/test_networking.py
@@ -0,0 +1,1439 @@
+#!/usr/bin/env python3
+
+# Allow direct execution
+import os
+import sys
+
+import pytest
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import gzip
+import http.client
+import http.cookiejar
+import http.server
+import io
+import pathlib
+import random
+import ssl
+import tempfile
+import threading
+import time
+import urllib.error
+import urllib.request
+import warnings
+import zlib
+from email.message import Message
+from http.cookiejar import CookieJar
+
+from test.helper import FakeYDL, http_server_port
+from yt_dlp.cookies import YoutubeDLCookieJar
+from yt_dlp.dependencies import brotli
+from yt_dlp.networking import (
+ HEADRequest,
+ PUTRequest,
+ Request,
+ RequestDirector,
+ RequestHandler,
+ Response,
+)
+from yt_dlp.networking._urllib import UrllibRH
+from yt_dlp.networking.exceptions import (
+ CertificateVerifyError,
+ HTTPError,
+ IncompleteRead,
+ NoSupportingHandlers,
+ RequestError,
+ SSLError,
+ TransportError,
+ UnsupportedRequest,
+)
+from yt_dlp.utils._utils import _YDLLogger as FakeLogger
+from yt_dlp.utils.networking import HTTPHeaderDict
+
+TEST_DIR = os.path.dirname(os.path.abspath(__file__))
+
+
+def _build_proxy_handler(name):
+ class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
+ proxy_name = name
+
+ def log_message(self, format, *args):
+ pass
+
+ def do_GET(self):
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/plain; charset=utf-8')
+ self.end_headers()
+ self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode())
+ return HTTPTestRequestHandler
+
+
+class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
+ protocol_version = 'HTTP/1.1'
+
+ def log_message(self, format, *args):
+ pass
+
+ def _headers(self):
+ payload = str(self.headers).encode()
+ self.send_response(200)
+ self.send_header('Content-Type', 'application/json')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+
+ def _redirect(self):
+ self.send_response(int(self.path[len('/redirect_'):]))
+ self.send_header('Location', '/method')
+ self.send_header('Content-Length', '0')
+ self.end_headers()
+
+ def _method(self, method, payload=None):
+ self.send_response(200)
+ self.send_header('Content-Length', str(len(payload or '')))
+ self.send_header('Method', method)
+ self.end_headers()
+ if payload:
+ self.wfile.write(payload)
+
+ def _status(self, status):
+ payload = f'<html>{status} NOT FOUND</html>'.encode()
+ self.send_response(int(status))
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+
+ def _read_data(self):
+ if 'Content-Length' in self.headers:
+ return self.rfile.read(int(self.headers['Content-Length']))
+
+ def do_POST(self):
+ data = self._read_data() + str(self.headers).encode()
+ if self.path.startswith('/redirect_'):
+ self._redirect()
+ elif self.path.startswith('/method'):
+ self._method('POST', data)
+ elif self.path.startswith('/headers'):
+ self._headers()
+ else:
+ self._status(404)
+
+ def do_HEAD(self):
+ if self.path.startswith('/redirect_'):
+ self._redirect()
+ elif self.path.startswith('/method'):
+ self._method('HEAD')
+ else:
+ self._status(404)
+
+ def do_PUT(self):
+ data = self._read_data() + str(self.headers).encode()
+ if self.path.startswith('/redirect_'):
+ self._redirect()
+ elif self.path.startswith('/method'):
+ self._method('PUT', data)
+ else:
+ self._status(404)
+
+ def do_GET(self):
+ if self.path == '/video.html':
+ payload = b'<html><video src="/vid.mp4" /></html>'
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ elif self.path == '/vid.mp4':
+ payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
+ self.send_response(200)
+ self.send_header('Content-Type', 'video/mp4')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ elif self.path == '/%E4%B8%AD%E6%96%87.html':
+ payload = b'<html><video src="/vid.mp4" /></html>'
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ elif self.path == '/%c7%9f':
+ payload = b'<html><video src="/vid.mp4" /></html>'
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ elif self.path.startswith('/redirect_loop'):
+ self.send_response(301)
+ self.send_header('Location', self.path)
+ self.send_header('Content-Length', '0')
+ self.end_headers()
+ elif self.path == '/redirect_dotsegments':
+ self.send_response(301)
+ # redirect to /headers but with dot segments before
+ self.send_header('Location', '/a/b/./../../headers')
+ self.send_header('Content-Length', '0')
+ self.end_headers()
+ elif self.path.startswith('/redirect_'):
+ self._redirect()
+ elif self.path.startswith('/method'):
+ self._method('GET', str(self.headers).encode())
+ elif self.path.startswith('/headers'):
+ self._headers()
+ elif self.path.startswith('/308-to-headers'):
+ self.send_response(308)
+ self.send_header('Location', '/headers')
+ self.send_header('Content-Length', '0')
+ self.end_headers()
+ elif self.path == '/trailing_garbage':
+ payload = b'<html><video src="/vid.mp4" /></html>'
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Encoding', 'gzip')
+ buf = io.BytesIO()
+ with gzip.GzipFile(fileobj=buf, mode='wb') as f:
+ f.write(payload)
+ compressed = buf.getvalue() + b'trailing garbage'
+ self.send_header('Content-Length', str(len(compressed)))
+ self.end_headers()
+ self.wfile.write(compressed)
+ elif self.path == '/302-non-ascii-redirect':
+ new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
+ self.send_response(301)
+ self.send_header('Location', new_url)
+ self.send_header('Content-Length', '0')
+ self.end_headers()
+ elif self.path == '/content-encoding':
+ encodings = self.headers.get('ytdl-encoding', '')
+ payload = b'<html><video src="/vid.mp4" /></html>'
+ for encoding in filter(None, (e.strip() for e in encodings.split(','))):
+ if encoding == 'br' and brotli:
+ payload = brotli.compress(payload)
+ elif encoding == 'gzip':
+ buf = io.BytesIO()
+ with gzip.GzipFile(fileobj=buf, mode='wb') as f:
+ f.write(payload)
+ payload = buf.getvalue()
+ elif encoding == 'deflate':
+ payload = zlib.compress(payload)
+ elif encoding == 'unsupported':
+ payload = b'raw'
+ break
+ else:
+ self._status(415)
+ return
+ self.send_response(200)
+ self.send_header('Content-Encoding', encodings)
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ elif self.path.startswith('/gen_'):
+ payload = b'<html></html>'
+ self.send_response(int(self.path[len('/gen_'):]))
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ elif self.path.startswith('/incompleteread'):
+ payload = b'<html></html>'
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', '234234')
+ self.end_headers()
+ self.wfile.write(payload)
+ self.finish()
+ elif self.path.startswith('/timeout_'):
+ time.sleep(int(self.path[len('/timeout_'):]))
+ self._headers()
+ elif self.path == '/source_address':
+ payload = str(self.client_address[0]).encode()
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload)
+ self.finish()
+ else:
+ self._status(404)
+
+ def send_header(self, keyword, value):
+ """
+ Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
+ This is against what is defined in RFC 3986, however we need to test we support this
+ since some sites incorrectly do this.
+ """
+ if keyword.lower() == 'connection':
+ return super().send_header(keyword, value)
+
+ if not hasattr(self, '_headers_buffer'):
+ self._headers_buffer = []
+
+ self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
+
+
+def validate_and_send(rh, req):
+ rh.validate(req)
+ return rh.send(req)
+
+
+class TestRequestHandlerBase:
+ @classmethod
+ def setup_class(cls):
+ cls.http_httpd = http.server.ThreadingHTTPServer(
+ ('127.0.0.1', 0), HTTPTestRequestHandler)
+ cls.http_port = http_server_port(cls.http_httpd)
+ cls.http_server_thread = threading.Thread(target=cls.http_httpd.serve_forever)
+ # FIXME: we should probably stop the http server thread after each test
+ # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
+ cls.http_server_thread.daemon = True
+ cls.http_server_thread.start()
+
+ # HTTPS server
+ certfn = os.path.join(TEST_DIR, 'testcert.pem')
+ cls.https_httpd = http.server.ThreadingHTTPServer(
+ ('127.0.0.1', 0), HTTPTestRequestHandler)
+ sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ sslctx.load_cert_chain(certfn, None)
+ cls.https_httpd.socket = sslctx.wrap_socket(cls.https_httpd.socket, server_side=True)
+ cls.https_port = http_server_port(cls.https_httpd)
+ cls.https_server_thread = threading.Thread(target=cls.https_httpd.serve_forever)
+ cls.https_server_thread.daemon = True
+ cls.https_server_thread.start()
+
+
+class TestHTTPRequestHandler(TestRequestHandlerBase):
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_verify_cert(self, handler):
+ with handler() as rh:
+ with pytest.raises(CertificateVerifyError):
+ validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
+
+ with handler(verify=False) as rh:
+ r = validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
+ assert r.status == 200
+ r.close()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_ssl_error(self, handler):
+ # HTTPS server with too old TLS version
+ # XXX: is there a better way to test this than to create a new server?
+ https_httpd = http.server.ThreadingHTTPServer(
+ ('127.0.0.1', 0), HTTPTestRequestHandler)
+ sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
+ https_port = http_server_port(https_httpd)
+ https_server_thread = threading.Thread(target=https_httpd.serve_forever)
+ https_server_thread.daemon = True
+ https_server_thread.start()
+
+ with handler(verify=False) as rh:
+ with pytest.raises(SSLError, match='sslv3 alert handshake failure') as exc_info:
+ validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
+ assert not issubclass(exc_info.type, CertificateVerifyError)
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_percent_encode(self, handler):
+ with handler() as rh:
+ # Unicode characters should be encoded with uppercase percent-encoding
+ res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
+ assert res.status == 200
+ res.close()
+ # don't normalize existing percent encodings
+ res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
+ assert res.status == 200
+ res.close()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_remove_dot_segments(self, handler):
+ with handler() as rh:
+ # This isn't a comprehensive test,
+ # but it should be enough to check whether the handler is removing dot segments
+ res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/a/b/./../../headers'))
+ assert res.status == 200
+ assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
+ res.close()
+
+ res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_dotsegments'))
+ assert res.status == 200
+ assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
+ res.close()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_unicode_path_redirection(self, handler):
+ with handler() as rh:
+ r = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
+ assert r.url == f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html'
+ r.close()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_raise_http_error(self, handler):
+ with handler() as rh:
+ for bad_status in (400, 500, 599, 302):
+ with pytest.raises(HTTPError):
+ validate_and_send(rh, Request('http://127.0.0.1:%d/gen_%d' % (self.http_port, bad_status)))
+
+ # Should not raise an error
+ validate_and_send(rh, Request('http://127.0.0.1:%d/gen_200' % self.http_port)).close()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_response_url(self, handler):
+ with handler() as rh:
+ # Response url should be that of the last url in redirect chain
+ res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_301'))
+ assert res.url == f'http://127.0.0.1:{self.http_port}/method'
+ res.close()
+ res2 = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200'))
+ assert res2.url == f'http://127.0.0.1:{self.http_port}/gen_200'
+ res2.close()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_redirect(self, handler):
+ with handler() as rh:
+ def do_req(redirect_status, method, assert_no_content=False):
+ data = b'testdata' if method in ('POST', 'PUT') else None
+ res = validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data))
+
+ headers = b''
+ data_sent = b''
+ if data is not None:
+ data_sent += res.read(len(data))
+ if data_sent != data:
+ headers += data_sent
+ data_sent = b''
+
+ headers += res.read()
+
+ if assert_no_content or data is None:
+ assert b'Content-Type' not in headers
+ assert b'Content-Length' not in headers
+ else:
+ assert b'Content-Type' in headers
+ assert b'Content-Length' in headers
+
+ return data_sent.decode(), res.headers.get('method', '')
+
+ # A 303 must either use GET or HEAD for subsequent request
+ assert do_req(303, 'POST', True) == ('', 'GET')
+ assert do_req(303, 'HEAD') == ('', 'HEAD')
+
+ assert do_req(303, 'PUT', True) == ('', 'GET')
+
+ # 301 and 302 turn POST only into a GET
+ assert do_req(301, 'POST', True) == ('', 'GET')
+ assert do_req(301, 'HEAD') == ('', 'HEAD')
+ assert do_req(302, 'POST', True) == ('', 'GET')
+ assert do_req(302, 'HEAD') == ('', 'HEAD')
+
+ assert do_req(301, 'PUT') == ('testdata', 'PUT')
+ assert do_req(302, 'PUT') == ('testdata', 'PUT')
+
+ # 307 and 308 should not change method
+ for m in ('POST', 'PUT'):
+ assert do_req(307, m) == ('testdata', m)
+ assert do_req(308, m) == ('testdata', m)
+
+ assert do_req(307, 'HEAD') == ('', 'HEAD')
+ assert do_req(308, 'HEAD') == ('', 'HEAD')
+
+ # These should not redirect and instead raise an HTTPError
+ for code in (300, 304, 305, 306):
+ with pytest.raises(HTTPError):
+ do_req(code, 'GET')
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_request_cookie_header(self, handler):
+ # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
+ with handler() as rh:
+ # Specified Cookie header should be used
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/headers',
+ headers={'Cookie': 'test=test'})).read().decode()
+ assert 'Cookie: test=test' in res
+
+ # Specified Cookie header should be removed on any redirect
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/308-to-headers',
+ headers={'Cookie': 'test=test'})).read().decode()
+ assert 'Cookie: test=test' not in res
+
+ # Specified Cookie header should override global cookiejar for that request
+ cookiejar = YoutubeDLCookieJar()
+ cookiejar.set_cookie(http.cookiejar.Cookie(
+ version=0, name='test', value='ytdlp', port=None, port_specified=False,
+ domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
+ path_specified=True, secure=False, expires=None, discard=False, comment=None,
+ comment_url=None, rest={}))
+
+ with handler(cookiejar=cookiejar) as rh:
+ data = validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'cookie': 'test=test'})).read()
+ assert b'Cookie: test=ytdlp' not in data
+ assert b'Cookie: test=test' in data
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_redirect_loop(self, handler):
+ with handler() as rh:
+ with pytest.raises(HTTPError, match='redirect loop'):
+ validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_loop'))
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_incompleteread(self, handler):
+ with handler(timeout=2) as rh:
+ with pytest.raises(IncompleteRead):
+ validate_and_send(rh, Request('http://127.0.0.1:%d/incompleteread' % self.http_port)).read()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_cookies(self, handler):
+ cookiejar = YoutubeDLCookieJar()
+ cookiejar.set_cookie(http.cookiejar.Cookie(
+ 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
+ False, '/headers', True, False, None, False, None, None, {}))
+
+ with handler(cookiejar=cookiejar) as rh:
+ data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
+ assert b'Cookie: test=ytdlp' in data
+
+ # Per request
+ with handler() as rh:
+ data = validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': cookiejar})).read()
+ assert b'Cookie: test=ytdlp' in data
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_headers(self, handler):
+
+ with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
+ # Global Headers
+ data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
+ assert b'Test1: test' in data
+
+ # Per request headers, merged with global
+ data = validate_and_send(rh, Request(
+ f'http://127.0.0.1:{self.http_port}/headers', headers={'test2': 'changed', 'test3': 'test3'})).read()
+ assert b'Test1: test' in data
+ assert b'Test2: changed' in data
+ assert b'Test2: test2' not in data
+ assert b'Test3: test3' in data
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_timeout(self, handler):
+ with handler() as rh:
+ # Default timeout is 20 seconds, so this should go through
+ validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_3'))
+
+ with handler(timeout=0.5) as rh:
+ with pytest.raises(TransportError):
+ validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
+
+ # Per request timeout, should override handler timeout
+ validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1', extensions={'timeout': 4}))
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_source_address(self, handler):
+ source_address = f'127.0.0.{random.randint(5, 255)}'
+ with handler(source_address=source_address) as rh:
+ data = validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/source_address')).read().decode()
+ assert source_address == data
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_gzip_trailing_garbage(self, handler):
+ with handler() as rh:
+ data = validate_and_send(rh, Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode()
+ assert data == '<html><video src="/vid.mp4" /></html>'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ @pytest.mark.skipif(not brotli, reason='brotli support is not installed')
+ def test_brotli(self, handler):
+ with handler() as rh:
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': 'br'}))
+ assert res.headers.get('Content-Encoding') == 'br'
+ assert res.read() == b'<html><video src="/vid.mp4" /></html>'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_deflate(self, handler):
+ with handler() as rh:
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': 'deflate'}))
+ assert res.headers.get('Content-Encoding') == 'deflate'
+ assert res.read() == b'<html><video src="/vid.mp4" /></html>'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_gzip(self, handler):
+ with handler() as rh:
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': 'gzip'}))
+ assert res.headers.get('Content-Encoding') == 'gzip'
+ assert res.read() == b'<html><video src="/vid.mp4" /></html>'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_multiple_encodings(self, handler):
+ with handler() as rh:
+ for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': pair}))
+ assert res.headers.get('Content-Encoding') == pair
+ assert res.read() == b'<html><video src="/vid.mp4" /></html>'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_unsupported_encoding(self, handler):
+ with handler() as rh:
+ res = validate_and_send(
+ rh, Request(
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
+ headers={'ytdl-encoding': 'unsupported'}))
+ assert res.headers.get('Content-Encoding') == 'unsupported'
+ assert res.read() == b'raw'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_read(self, handler):
+ with handler() as rh:
+ res = validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/headers'))
+ assert res.readable()
+ assert res.read(1) == b'H'
+ assert res.read(3) == b'ost'
+
+
+class TestHTTPProxy(TestRequestHandlerBase):
+ @classmethod
+ def setup_class(cls):
+ super().setup_class()
+ # HTTP Proxy server
+ cls.proxy = http.server.ThreadingHTTPServer(
+ ('127.0.0.1', 0), _build_proxy_handler('normal'))
+ cls.proxy_port = http_server_port(cls.proxy)
+ cls.proxy_thread = threading.Thread(target=cls.proxy.serve_forever)
+ cls.proxy_thread.daemon = True
+ cls.proxy_thread.start()
+
+ # Geo proxy server
+ cls.geo_proxy = http.server.ThreadingHTTPServer(
+ ('127.0.0.1', 0), _build_proxy_handler('geo'))
+ cls.geo_port = http_server_port(cls.geo_proxy)
+ cls.geo_proxy_thread = threading.Thread(target=cls.geo_proxy.serve_forever)
+ cls.geo_proxy_thread.daemon = True
+ cls.geo_proxy_thread.start()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_http_proxy(self, handler):
+ http_proxy = f'http://127.0.0.1:{self.proxy_port}'
+ geo_proxy = f'http://127.0.0.1:{self.geo_port}'
+
+ # Test global http proxy
+ # Test per request http proxy
+ # Test per request http proxy disables proxy
+ url = 'http://foo.com/bar'
+
+ # Global HTTP proxy
+ with handler(proxies={'http': http_proxy}) as rh:
+ res = validate_and_send(rh, Request(url)).read().decode()
+ assert res == f'normal: {url}'
+
+ # Per request proxy overrides global
+ res = validate_and_send(rh, Request(url, proxies={'http': geo_proxy})).read().decode()
+ assert res == f'geo: {url}'
+
+ # and setting to None disables all proxies for that request
+ real_url = f'http://127.0.0.1:{self.http_port}/headers'
+ res = validate_and_send(
+ rh, Request(real_url, proxies={'http': None})).read().decode()
+ assert res != f'normal: {real_url}'
+ assert 'Accept' in res
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_noproxy(self, handler):
+ with handler(proxies={'proxy': f'http://127.0.0.1:{self.proxy_port}'}) as rh:
+ # NO_PROXY
+ for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
+ nop_response = validate_and_send(
+ rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy})).read().decode(
+ 'utf-8')
+ assert 'Accept' in nop_response
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_allproxy(self, handler):
+ url = 'http://foo.com/bar'
+ with handler() as rh:
+ response = validate_and_send(rh, Request(url, proxies={'all': f'http://127.0.0.1:{self.proxy_port}'})).read().decode(
+ 'utf-8')
+ assert response == f'normal: {url}'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_http_proxy_with_idn(self, handler):
+ with handler(proxies={
+ 'http': f'http://127.0.0.1:{self.proxy_port}',
+ }) as rh:
+ url = 'http://中文.tw/'
+ response = rh.send(Request(url)).read().decode()
+ # b'xn--fiq228c' is '中文'.encode('idna')
+ assert response == 'normal: http://xn--fiq228c.tw/'
+
+
+class TestClientCertificate:
+
+ @classmethod
+ def setup_class(cls):
+ certfn = os.path.join(TEST_DIR, 'testcert.pem')
+ cls.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
+ cacertfn = os.path.join(cls.certdir, 'ca.crt')
+ cls.httpd = http.server.ThreadingHTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
+ sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ sslctx.verify_mode = ssl.CERT_REQUIRED
+ sslctx.load_verify_locations(cafile=cacertfn)
+ sslctx.load_cert_chain(certfn, None)
+ cls.httpd.socket = sslctx.wrap_socket(cls.httpd.socket, server_side=True)
+ cls.port = http_server_port(cls.httpd)
+ cls.server_thread = threading.Thread(target=cls.httpd.serve_forever)
+ cls.server_thread.daemon = True
+ cls.server_thread.start()
+
+ def _run_test(self, handler, **handler_kwargs):
+ with handler(
+ # Disable client-side validation of unacceptable self-signed testcert.pem
+ # The test is of a check on the server side, so unaffected
+ verify=False,
+ **handler_kwargs,
+ ) as rh:
+ validate_and_send(rh, Request(f'https://127.0.0.1:{self.port}/video.html')).read().decode()
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_certificate_combined_nopass(self, handler):
+ self._run_test(handler, client_cert={
+ 'client_certificate': os.path.join(self.certdir, 'clientwithkey.crt'),
+ })
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_certificate_nocombined_nopass(self, handler):
+ self._run_test(handler, client_cert={
+ 'client_certificate': os.path.join(self.certdir, 'client.crt'),
+ 'client_certificate_key': os.path.join(self.certdir, 'client.key'),
+ })
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_certificate_combined_pass(self, handler):
+ self._run_test(handler, client_cert={
+ 'client_certificate': os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
+ 'client_certificate_password': 'foobar',
+ })
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_certificate_nocombined_pass(self, handler):
+ self._run_test(handler, client_cert={
+ 'client_certificate': os.path.join(self.certdir, 'client.crt'),
+ 'client_certificate_key': os.path.join(self.certdir, 'clientencrypted.key'),
+ 'client_certificate_password': 'foobar',
+ })
+
+
+class TestUrllibRequestHandler(TestRequestHandlerBase):
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_file_urls(self, handler):
+ # See https://github.com/ytdl-org/youtube-dl/issues/8227
+ tf = tempfile.NamedTemporaryFile(delete=False)
+ tf.write(b'foobar')
+ tf.close()
+ req = Request(pathlib.Path(tf.name).as_uri())
+ with handler() as rh:
+ with pytest.raises(UnsupportedRequest):
+ rh.validate(req)
+
+ # Test that urllib never loaded FileHandler
+ with pytest.raises(TransportError):
+ rh.send(req)
+
+ with handler(enable_file_urls=True) as rh:
+ res = validate_and_send(rh, req)
+ assert res.read() == b'foobar'
+ res.close()
+
+ os.unlink(tf.name)
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_http_error_returns_content(self, handler):
+ # urllib HTTPError will try close the underlying response if reference to the HTTPError object is lost
+ def get_response():
+ with handler() as rh:
+ # headers url
+ try:
+ validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_404'))
+ except HTTPError as e:
+ return e.response
+
+ assert get_response().read() == b'<html></html>'
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_verify_cert_error_text(self, handler):
+ # Check the output of the error message
+ with handler() as rh:
+ with pytest.raises(
+ CertificateVerifyError,
+ match=r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed: self.signed certificate'
+ ):
+ validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
+
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ @pytest.mark.parametrize('req,match,version_check', [
+ # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1256
+ # bpo-39603: Check implemented in 3.7.9+, 3.8.5+
+ (
+ Request('http://127.0.0.1', method='GET\n'),
+ 'method can\'t contain control characters',
+ lambda v: v < (3, 7, 9) or (3, 8, 0) <= v < (3, 8, 5)
+ ),
+ # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1265
+ # bpo-38576: Check implemented in 3.7.8+, 3.8.3+
+ (
+ Request('http://127.0.0. 1', method='GET'),
+ 'URL can\'t contain control characters',
+ lambda v: v < (3, 7, 8) or (3, 8, 0) <= v < (3, 8, 3)
+ ),
+ # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1288C31-L1288C50
+ (Request('http://127.0.0.1', headers={'foo\n': 'bar'}), 'Invalid header name', None),
+ ])
+ def test_httplib_validation_errors(self, handler, req, match, version_check):
+ if version_check and version_check(sys.version_info):
+ pytest.skip(f'Python {sys.version} version does not have the required validation for this test.')
+
+ with handler() as rh:
+ with pytest.raises(RequestError, match=match) as exc_info:
+ validate_and_send(rh, req)
+ assert not isinstance(exc_info.value, TransportError)
+
+
+def run_validation(handler, error, req, **handler_kwargs):
+ with handler(**handler_kwargs) as rh:
+ if error:
+ with pytest.raises(error):
+ rh.validate(req)
+ else:
+ rh.validate(req)
+
+
+class TestRequestHandlerValidation:
+
+ class ValidationRH(RequestHandler):
+ def _send(self, request):
+ raise RequestError('test')
+
+ class NoCheckRH(ValidationRH):
+ _SUPPORTED_FEATURES = None
+ _SUPPORTED_PROXY_SCHEMES = None
+ _SUPPORTED_URL_SCHEMES = None
+
+ def _check_extensions(self, extensions):
+ extensions.clear()
+
+ class HTTPSupportedRH(ValidationRH):
+ _SUPPORTED_URL_SCHEMES = ('http',)
+
+ URL_SCHEME_TESTS = [
+ # scheme, expected to fail, handler kwargs
+ ('Urllib', [
+ ('http', False, {}),
+ ('https', False, {}),
+ ('data', False, {}),
+ ('ftp', False, {}),
+ ('file', UnsupportedRequest, {}),
+ ('file', False, {'enable_file_urls': True}),
+ ]),
+ (NoCheckRH, [('http', False, {})]),
+ (ValidationRH, [('http', UnsupportedRequest, {})])
+ ]
+
+ PROXY_SCHEME_TESTS = [
+ # scheme, expected to fail
+ ('Urllib', [
+ ('http', False),
+ ('https', UnsupportedRequest),
+ ('socks4', False),
+ ('socks4a', False),
+ ('socks5', False),
+ ('socks5h', False),
+ ('socks', UnsupportedRequest),
+ ]),
+ (NoCheckRH, [('http', False)]),
+ (HTTPSupportedRH, [('http', UnsupportedRequest)]),
+ ]
+
+ PROXY_KEY_TESTS = [
+ # key, expected to fail
+ ('Urllib', [
+ ('all', False),
+ ('unrelated', False),
+ ]),
+ (NoCheckRH, [('all', False)]),
+ (HTTPSupportedRH, [('all', UnsupportedRequest)]),
+ (HTTPSupportedRH, [('no', UnsupportedRequest)]),
+ ]
+
+ EXTENSION_TESTS = [
+ ('Urllib', [
+ ({'cookiejar': 'notacookiejar'}, AssertionError),
+ ({'cookiejar': YoutubeDLCookieJar()}, False),
+ ({'cookiejar': CookieJar()}, AssertionError),
+ ({'timeout': 1}, False),
+ ({'timeout': 'notatimeout'}, AssertionError),
+ ({'unsupported': 'value'}, UnsupportedRequest),
+ ]),
+ (NoCheckRH, [
+ ({'cookiejar': 'notacookiejar'}, False),
+ ({'somerandom': 'test'}, False), # but any extension is allowed through
+ ]),
+ ]
+
+ @pytest.mark.parametrize('handler,scheme,fail,handler_kwargs', [
+ (handler_tests[0], scheme, fail, handler_kwargs)
+ for handler_tests in URL_SCHEME_TESTS
+ for scheme, fail, handler_kwargs in handler_tests[1]
+
+ ], indirect=['handler'])
+ def test_url_scheme(self, handler, scheme, fail, handler_kwargs):
+ run_validation(handler, fail, Request(f'{scheme}://'), **(handler_kwargs or {}))
+
+ @pytest.mark.parametrize('handler,fail', [('Urllib', False)], indirect=['handler'])
+ def test_no_proxy(self, handler, fail):
+ run_validation(handler, fail, Request('http://', proxies={'no': '127.0.0.1,github.com'}))
+ run_validation(handler, fail, Request('http://'), proxies={'no': '127.0.0.1,github.com'})
+
+ @pytest.mark.parametrize('handler,proxy_key,fail', [
+ (handler_tests[0], proxy_key, fail)
+ for handler_tests in PROXY_KEY_TESTS
+ for proxy_key, fail in handler_tests[1]
+ ], indirect=['handler'])
+ def test_proxy_key(self, handler, proxy_key, fail):
+ run_validation(handler, fail, Request('http://', proxies={proxy_key: 'http://example.com'}))
+ run_validation(handler, fail, Request('http://'), proxies={proxy_key: 'http://example.com'})
+
+ @pytest.mark.parametrize('handler,scheme,fail', [
+ (handler_tests[0], scheme, fail)
+ for handler_tests in PROXY_SCHEME_TESTS
+ for scheme, fail in handler_tests[1]
+ ], indirect=['handler'])
+ def test_proxy_scheme(self, handler, scheme, fail):
+ run_validation(handler, fail, Request('http://', proxies={'http': f'{scheme}://example.com'}))
+ run_validation(handler, fail, Request('http://'), proxies={'http': f'{scheme}://example.com'})
+
+ @pytest.mark.parametrize('handler', ['Urllib', HTTPSupportedRH], indirect=True)
+ def test_empty_proxy(self, handler):
+ run_validation(handler, False, Request('http://', proxies={'http': None}))
+ run_validation(handler, False, Request('http://'), proxies={'http': None})
+
+ @pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
+ @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
+ def test_invalid_proxy_url(self, handler, proxy_url):
+ run_validation(handler, UnsupportedRequest, Request('http://', proxies={'http': proxy_url}))
+
+ @pytest.mark.parametrize('handler,extensions,fail', [
+ (handler_tests[0], extensions, fail)
+ for handler_tests in EXTENSION_TESTS
+ for extensions, fail in handler_tests[1]
+ ], indirect=['handler'])
+ def test_extension(self, handler, extensions, fail):
+ run_validation(
+ handler, fail, Request('http://', extensions=extensions))
+
+ def test_invalid_request_type(self):
+ rh = self.ValidationRH(logger=FakeLogger())
+ for method in (rh.validate, rh.send):
+ with pytest.raises(TypeError, match='Expected an instance of Request'):
+ method('not a request')
+
+
+class FakeResponse(Response):
+ def __init__(self, request):
+ # XXX: we could make request part of standard response interface
+ self.request = request
+ super().__init__(fp=io.BytesIO(b''), headers={}, url=request.url)
+
+
+class FakeRH(RequestHandler):
+
+ def _validate(self, request):
+ return
+
+ def _send(self, request: Request):
+ if request.url.startswith('ssl://'):
+ raise SSLError(request.url[len('ssl://'):])
+ return FakeResponse(request)
+
+
+class FakeRHYDL(FakeYDL):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._request_director = self.build_request_director([FakeRH])
+
+
+class TestRequestDirector:
+
+ def test_handler_operations(self):
+ director = RequestDirector(logger=FakeLogger())
+ handler = FakeRH(logger=FakeLogger())
+ director.add_handler(handler)
+ assert director.handlers.get(FakeRH.RH_KEY) is handler
+
+ # Handler should overwrite
+ handler2 = FakeRH(logger=FakeLogger())
+ director.add_handler(handler2)
+ assert director.handlers.get(FakeRH.RH_KEY) is not handler
+ assert director.handlers.get(FakeRH.RH_KEY) is handler2
+ assert len(director.handlers) == 1
+
+ class AnotherFakeRH(FakeRH):
+ pass
+ director.add_handler(AnotherFakeRH(logger=FakeLogger()))
+ assert len(director.handlers) == 2
+ assert director.handlers.get(AnotherFakeRH.RH_KEY).RH_KEY == AnotherFakeRH.RH_KEY
+
+ director.handlers.pop(FakeRH.RH_KEY, None)
+ assert director.handlers.get(FakeRH.RH_KEY) is None
+ assert len(director.handlers) == 1
+
+ # RequestErrors should passthrough
+ with pytest.raises(SSLError):
+ director.send(Request('ssl://something'))
+
+ def test_send(self):
+ director = RequestDirector(logger=FakeLogger())
+ with pytest.raises(RequestError):
+ director.send(Request('any://'))
+ director.add_handler(FakeRH(logger=FakeLogger()))
+ assert isinstance(director.send(Request('http://')), FakeResponse)
+
+ def test_unsupported_handlers(self):
+ class SupportedRH(RequestHandler):
+ _SUPPORTED_URL_SCHEMES = ['http']
+
+ def _send(self, request: Request):
+ return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
+
+ director = RequestDirector(logger=FakeLogger())
+ director.add_handler(SupportedRH(logger=FakeLogger()))
+ director.add_handler(FakeRH(logger=FakeLogger()))
+
+ # First should take preference
+ assert director.send(Request('http://')).read() == b'supported'
+ assert director.send(Request('any://')).read() == b''
+
+ director.handlers.pop(FakeRH.RH_KEY)
+ with pytest.raises(NoSupportingHandlers):
+ director.send(Request('any://'))
+
+ def test_unexpected_error(self):
+ director = RequestDirector(logger=FakeLogger())
+
+ class UnexpectedRH(FakeRH):
+ def _send(self, request: Request):
+ raise TypeError('something')
+
+ director.add_handler(UnexpectedRH(logger=FakeLogger))
+ with pytest.raises(NoSupportingHandlers, match=r'1 unexpected error'):
+ director.send(Request('any://'))
+
+ director.handlers.clear()
+ assert len(director.handlers) == 0
+
+ # Should not be fatal
+ director.add_handler(FakeRH(logger=FakeLogger()))
+ director.add_handler(UnexpectedRH(logger=FakeLogger))
+ assert director.send(Request('any://'))
+
+ def test_preference(self):
+ director = RequestDirector(logger=FakeLogger())
+ director.add_handler(FakeRH(logger=FakeLogger()))
+
+ class SomeRH(RequestHandler):
+ _SUPPORTED_URL_SCHEMES = ['http']
+
+ def _send(self, request: Request):
+ return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
+
+ def some_preference(rh, request):
+ return (0 if not isinstance(rh, SomeRH)
+ else 100 if 'prefer' in request.headers
+ else -1)
+
+ director.add_handler(SomeRH(logger=FakeLogger()))
+ director.preferences.add(some_preference)
+
+ assert director.send(Request('http://')).read() == b''
+ assert director.send(Request('http://', headers={'prefer': '1'})).read() == b'supported'
+
+
+# XXX: do we want to move this to test_YoutubeDL.py?
+class TestYoutubeDLNetworking:
+
+ @staticmethod
+ def build_handler(ydl, handler: RequestHandler = FakeRH):
+ return ydl.build_request_director([handler]).handlers.get(handler.RH_KEY)
+
+ def test_compat_opener(self):
+ with FakeYDL() as ydl:
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', category=DeprecationWarning)
+ assert isinstance(ydl._opener, urllib.request.OpenerDirector)
+
+ @pytest.mark.parametrize('proxy,expected', [
+ ('http://127.0.0.1:8080', {'all': 'http://127.0.0.1:8080'}),
+ ('', {'all': '__noproxy__'}),
+ (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
+ ])
+ def test_proxy(self, proxy, expected):
+ old_http_proxy = os.environ.get('HTTP_PROXY')
+ try:
+ os.environ['HTTP_PROXY'] = 'http://127.0.0.1:8081' # ensure that provided proxies override env
+ with FakeYDL({'proxy': proxy}) as ydl:
+ assert ydl.proxies == expected
+ finally:
+ if old_http_proxy:
+ os.environ['HTTP_PROXY'] = old_http_proxy
+
+ def test_compat_request(self):
+ with FakeRHYDL() as ydl:
+ assert ydl.urlopen('test://')
+ urllib_req = urllib.request.Request('http://foo.bar', data=b'test', method='PUT', headers={'X-Test': '1'})
+ urllib_req.add_unredirected_header('Cookie', 'bob=bob')
+ urllib_req.timeout = 2
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', category=DeprecationWarning)
+ req = ydl.urlopen(urllib_req).request
+ assert req.url == urllib_req.get_full_url()
+ assert req.data == urllib_req.data
+ assert req.method == urllib_req.get_method()
+ assert 'X-Test' in req.headers
+ assert 'Cookie' in req.headers
+ assert req.extensions.get('timeout') == 2
+
+ with pytest.raises(AssertionError):
+ ydl.urlopen(None)
+
+ def test_extract_basic_auth(self):
+ with FakeRHYDL() as ydl:
+ res = ydl.urlopen(Request('http://user:pass@foo.bar'))
+ assert res.request.headers['Authorization'] == 'Basic dXNlcjpwYXNz'
+
+ def test_sanitize_url(self):
+ with FakeRHYDL() as ydl:
+ res = ydl.urlopen(Request('httpss://foo.bar'))
+ assert res.request.url == 'https://foo.bar'
+
+ def test_file_urls_error(self):
+ # use urllib handler
+ with FakeYDL() as ydl:
+ with pytest.raises(RequestError, match=r'file:// URLs are disabled by default'):
+ ydl.urlopen('file://')
+
+ def test_legacy_server_connect_error(self):
+ with FakeRHYDL() as ydl:
+ for error in ('UNSAFE_LEGACY_RENEGOTIATION_DISABLED', 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
+ with pytest.raises(RequestError, match=r'Try using --legacy-server-connect'):
+ ydl.urlopen(f'ssl://{error}')
+
+ with pytest.raises(SSLError, match='testerror'):
+ ydl.urlopen('ssl://testerror')
+
+ @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
+ ('http', '__noproxy__', None),
+ ('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
+ ('https', 'example.com', 'http://example.com'),
+ ('https', '//example.com', 'http://example.com'),
+ ('https', 'socks5://example.com', 'socks5h://example.com'),
+ ('http', 'socks://example.com', 'socks4://example.com'),
+ ('http', 'socks4://example.com', 'socks4://example.com'),
+ ('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
+ ])
+ def test_clean_proxy(self, proxy_key, proxy_url, expected):
+ # proxies should be cleaned in urlopen()
+ with FakeRHYDL() as ydl:
+ req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
+ assert req.proxies[proxy_key] == expected
+
+ # and should also be cleaned when building the handler
+ env_key = f'{proxy_key.upper()}_PROXY'
+ old_env_proxy = os.environ.get(env_key)
+ try:
+ os.environ[env_key] = proxy_url # ensure that provided proxies override env
+ with FakeYDL() as ydl:
+ rh = self.build_handler(ydl)
+ assert rh.proxies[proxy_key] == expected
+ finally:
+ if old_env_proxy:
+ os.environ[env_key] = old_env_proxy
+
+ def test_clean_proxy_header(self):
+ with FakeRHYDL() as ydl:
+ req = ydl.urlopen(Request('test://', headers={'ytdl-request-proxy': '//foo.bar'})).request
+ assert 'ytdl-request-proxy' not in req.headers
+ assert req.proxies == {'all': 'http://foo.bar'}
+
+ with FakeYDL({'http_headers': {'ytdl-request-proxy': '//foo.bar'}}) as ydl:
+ rh = self.build_handler(ydl)
+ assert 'ytdl-request-proxy' not in rh.headers
+ assert rh.proxies == {'all': 'http://foo.bar'}
+
+ def test_clean_header(self):
+ with FakeRHYDL() as ydl:
+ res = ydl.urlopen(Request('test://', headers={'Youtubedl-no-compression': True}))
+ assert 'Youtubedl-no-compression' not in res.request.headers
+ assert res.request.headers.get('Accept-Encoding') == 'identity'
+
+ with FakeYDL({'http_headers': {'Youtubedl-no-compression': True}}) as ydl:
+ rh = self.build_handler(ydl)
+ assert 'Youtubedl-no-compression' not in rh.headers
+ assert rh.headers.get('Accept-Encoding') == 'identity'
+
+ def test_build_handler_params(self):
+ with FakeYDL({
+ 'http_headers': {'test': 'testtest'},
+ 'socket_timeout': 2,
+ 'proxy': 'http://127.0.0.1:8080',
+ 'source_address': '127.0.0.45',
+ 'debug_printtraffic': True,
+ 'compat_opts': ['no-certifi'],
+ 'nocheckcertificate': True,
+ 'legacyserverconnect': True,
+ }) as ydl:
+ rh = self.build_handler(ydl)
+ assert rh.headers.get('test') == 'testtest'
+ assert 'Accept' in rh.headers # ensure std_headers are still there
+ assert rh.timeout == 2
+ assert rh.proxies.get('all') == 'http://127.0.0.1:8080'
+ assert rh.source_address == '127.0.0.45'
+ assert rh.verbose is True
+ assert rh.prefer_system_certs is True
+ assert rh.verify is False
+ assert rh.legacy_ssl_support is True
+
+ @pytest.mark.parametrize('ydl_params', [
+ {'client_certificate': 'fakecert.crt'},
+ {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key'},
+ {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
+ {'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
+ ])
+ def test_client_certificate(self, ydl_params):
+ with FakeYDL(ydl_params) as ydl:
+ rh = self.build_handler(ydl)
+ assert rh._client_cert == ydl_params # XXX: Too bound to implementation
+
+ def test_urllib_file_urls(self):
+ with FakeYDL({'enable_file_urls': False}) as ydl:
+ rh = self.build_handler(ydl, UrllibRH)
+ assert rh.enable_file_urls is False
+
+ with FakeYDL({'enable_file_urls': True}) as ydl:
+ rh = self.build_handler(ydl, UrllibRH)
+ assert rh.enable_file_urls is True
+
+
+class TestRequest:
+
+ def test_query(self):
+ req = Request('http://example.com?q=something', query={'v': 'xyz'})
+ assert req.url == 'http://example.com?q=something&v=xyz'
+
+ req.update(query={'v': '123'})
+ assert req.url == 'http://example.com?q=something&v=123'
+ req.update(url='http://example.com', query={'v': 'xyz'})
+ assert req.url == 'http://example.com?v=xyz'
+
+ def test_method(self):
+ req = Request('http://example.com')
+ assert req.method == 'GET'
+ req.data = b'test'
+ assert req.method == 'POST'
+ req.data = None
+ assert req.method == 'GET'
+ req.data = b'test2'
+ req.method = 'PUT'
+ assert req.method == 'PUT'
+ req.data = None
+ assert req.method == 'PUT'
+ with pytest.raises(TypeError):
+ req.method = 1
+
+ def test_request_helpers(self):
+ assert HEADRequest('http://example.com').method == 'HEAD'
+ assert PUTRequest('http://example.com').method == 'PUT'
+
+ def test_headers(self):
+ req = Request('http://example.com', headers={'tesT': 'test'})
+ assert req.headers == HTTPHeaderDict({'test': 'test'})
+ req.update(headers={'teSt2': 'test2'})
+ assert req.headers == HTTPHeaderDict({'test': 'test', 'test2': 'test2'})
+
+ req.headers = new_headers = HTTPHeaderDict({'test': 'test'})
+ assert req.headers == HTTPHeaderDict({'test': 'test'})
+ assert req.headers is new_headers
+
+ # test converts dict to case insensitive dict
+ req.headers = new_headers = {'test2': 'test2'}
+ assert isinstance(req.headers, HTTPHeaderDict)
+ assert req.headers is not new_headers
+
+ with pytest.raises(TypeError):
+ req.headers = None
+
+ def test_data_type(self):
+ req = Request('http://example.com')
+ assert req.data is None
+ # test bytes is allowed
+ req.data = b'test'
+ assert req.data == b'test'
+ # test iterable of bytes is allowed
+ i = [b'test', b'test2']
+ req.data = i
+ assert req.data == i
+
+ # test file-like object is allowed
+ f = io.BytesIO(b'test')
+ req.data = f
+ assert req.data == f
+
+ # common mistake: test str not allowed
+ with pytest.raises(TypeError):
+ req.data = 'test'
+ assert req.data != 'test'
+
+ # common mistake: test dict is not allowed
+ with pytest.raises(TypeError):
+ req.data = {'test': 'test'}
+ assert req.data != {'test': 'test'}
+
+ def test_content_length_header(self):
+ req = Request('http://example.com', headers={'Content-Length': '0'}, data=b'')
+ assert req.headers.get('Content-Length') == '0'
+
+ req.data = b'test'
+ assert 'Content-Length' not in req.headers
+
+ req = Request('http://example.com', headers={'Content-Length': '10'})
+ assert 'Content-Length' not in req.headers
+
+ def test_content_type_header(self):
+ req = Request('http://example.com', headers={'Content-Type': 'test'}, data=b'test')
+ assert req.headers.get('Content-Type') == 'test'
+ req.data = b'test2'
+ assert req.headers.get('Content-Type') == 'test'
+ req.data = None
+ assert 'Content-Type' not in req.headers
+ req.data = b'test3'
+ assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
+
+ def test_update_req(self):
+ req = Request('http://example.com')
+ assert req.data is None
+ assert req.method == 'GET'
+ assert 'Content-Type' not in req.headers
+ # Test that zero-byte payloads will be sent
+ req.update(data=b'')
+ assert req.data == b''
+ assert req.method == 'POST'
+ assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
+
+ def test_proxies(self):
+ req = Request(url='http://example.com', proxies={'http': 'http://127.0.0.1:8080'})
+ assert req.proxies == {'http': 'http://127.0.0.1:8080'}
+
+ def test_extensions(self):
+ req = Request(url='http://example.com', extensions={'timeout': 2})
+ assert req.extensions == {'timeout': 2}
+
+ def test_copy(self):
+ req = Request(
+ url='http://example.com',
+ extensions={'cookiejar': CookieJar()},
+ headers={'Accept-Encoding': 'br'},
+ proxies={'http': 'http://127.0.0.1'},
+ data=[b'123']
+ )
+ req_copy = req.copy()
+ assert req_copy is not req
+ assert req_copy.url == req.url
+ assert req_copy.headers == req.headers
+ assert req_copy.headers is not req.headers
+ assert req_copy.proxies == req.proxies
+ assert req_copy.proxies is not req.proxies
+
+ # Data is not able to be copied
+ assert req_copy.data == req.data
+ assert req_copy.data is req.data
+
+ # Shallow copy extensions
+ assert req_copy.extensions is not req.extensions
+ assert req_copy.extensions['cookiejar'] == req.extensions['cookiejar']
+
+ # Subclasses are copied by default
+ class AnotherRequest(Request):
+ pass
+
+ req = AnotherRequest(url='http://127.0.0.1')
+ assert isinstance(req.copy(), AnotherRequest)
+
+ def test_url(self):
+ req = Request(url='https://фtest.example.com/ some spaceв?ä=c',)
+ assert req.url == 'https://xn--test-z6d.example.com/%20some%20space%D0%B2?%C3%A4=c'
+
+ assert Request(url='//example.com').url == 'http://example.com'
+
+ with pytest.raises(TypeError):
+ Request(url='https://').url = None
+
+
+class TestResponse:
+
+ @pytest.mark.parametrize('reason,status,expected', [
+ ('custom', 200, 'custom'),
+ (None, 404, 'Not Found'), # fallback status
+ ('', 403, 'Forbidden'),
+ (None, 999, None)
+ ])
+ def test_reason(self, reason, status, expected):
+ res = Response(io.BytesIO(b''), url='test://', headers={}, status=status, reason=reason)
+ assert res.reason == expected
+
+ def test_headers(self):
+ headers = Message()
+ headers.add_header('Test', 'test')
+ headers.add_header('Test', 'test2')
+ headers.add_header('content-encoding', 'br')
+ res = Response(io.BytesIO(b''), headers=headers, url='test://')
+ assert res.headers.get_all('test') == ['test', 'test2']
+ assert 'Content-Encoding' in res.headers
+
+ def test_get_header(self):
+ headers = Message()
+ headers.add_header('Set-Cookie', 'cookie1')
+ headers.add_header('Set-cookie', 'cookie2')
+ headers.add_header('Test', 'test')
+ headers.add_header('Test', 'test2')
+ res = Response(io.BytesIO(b''), headers=headers, url='test://')
+ assert res.get_header('test') == 'test, test2'
+ assert res.get_header('set-Cookie') == 'cookie1'
+ assert res.get_header('notexist', 'default') == 'default'
+
+ def test_compat(self):
+ res = Response(io.BytesIO(b''), url='test://', status=404, headers={'test': 'test'})
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', category=DeprecationWarning)
+ assert res.code == res.getcode() == res.status
+ assert res.geturl() == res.url
+ assert res.info() is res.headers
+ assert res.getheader('test') == res.get_header('test')
diff --git a/test/test_networking_utils.py b/test/test_networking_utils.py
new file mode 100644
index 000000000..dbf656090
--- /dev/null
+++ b/test/test_networking_utils.py
@@ -0,0 +1,282 @@
+#!/usr/bin/env python3
+
+# Allow direct execution
+import os
+import sys
+
+import pytest
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import contextlib
+import io
+import platform
+import random
+import ssl
+import urllib.error
+import warnings
+
+from yt_dlp.cookies import YoutubeDLCookieJar
+from yt_dlp.dependencies import certifi
+from yt_dlp.networking import Response
+from yt_dlp.networking._helper import (
+ InstanceStoreMixin,
+ add_accept_encoding_header,
+ get_redirect_method,
+ make_socks_proxy_opts,
+ select_proxy,
+ ssl_load_certs,
+)
+from yt_dlp.networking.exceptions import (
+ HTTPError,
+ IncompleteRead,
+ _CompatHTTPError,
+)
+from yt_dlp.socks import ProxyType
+from yt_dlp.utils.networking import HTTPHeaderDict
+
+TEST_DIR = os.path.dirname(os.path.abspath(__file__))
+
+
+class TestNetworkingUtils:
+
+ def test_select_proxy(self):
+ proxies = {
+ 'all': 'socks5://example.com',
+ 'http': 'http://example.com:1080',
+ 'no': 'bypass.example.com,yt-dl.org'
+ }
+
+ assert select_proxy('https://example.com', proxies) == proxies['all']
+ assert select_proxy('http://example.com', proxies) == proxies['http']
+ assert select_proxy('http://bypass.example.com', proxies) is None
+ assert select_proxy('https://yt-dl.org', proxies) is None
+
+ @pytest.mark.parametrize('socks_proxy,expected', [
+ ('socks5h://example.com', {
+ 'proxytype': ProxyType.SOCKS5,
+ 'addr': 'example.com',
+ 'port': 1080,
+ 'rdns': True,
+ 'username': None,
+ 'password': None
+ }),
+ ('socks5://user:@example.com:5555', {
+ 'proxytype': ProxyType.SOCKS5,
+ 'addr': 'example.com',
+ 'port': 5555,
+ 'rdns': False,
+ 'username': 'user',
+ 'password': ''
+ }),
+ ('socks4://u%40ser:pa%20ss@127.0.0.1:1080', {
+ 'proxytype': ProxyType.SOCKS4,
+ 'addr': '127.0.0.1',
+ 'port': 1080,
+ 'rdns': False,
+ 'username': 'u@ser',
+ 'password': 'pa ss'
+ }),
+ ('socks4a://:pa%20ss@127.0.0.1', {
+ 'proxytype': ProxyType.SOCKS4A,
+ 'addr': '127.0.0.1',
+ 'port': 1080,
+ 'rdns': True,
+ 'username': '',
+ 'password': 'pa ss'
+ })
+ ])
+ def test_make_socks_proxy_opts(self, socks_proxy, expected):
+ assert make_socks_proxy_opts(socks_proxy) == expected
+
+ def test_make_socks_proxy_unknown(self):
+ with pytest.raises(ValueError, match='Unknown SOCKS proxy version: socks'):
+ make_socks_proxy_opts('socks://127.0.0.1')
+
+ @pytest.mark.skipif(not certifi, reason='certifi is not installed')
+ def test_load_certifi(self):
+ context_certifi = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ context_certifi.load_verify_locations(cafile=certifi.where())
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ ssl_load_certs(context, use_certifi=True)
+ assert context.get_ca_certs() == context_certifi.get_ca_certs()
+
+ context_default = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ context_default.load_default_certs()
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ ssl_load_certs(context, use_certifi=False)
+ assert context.get_ca_certs() == context_default.get_ca_certs()
+
+ if context_default.get_ca_certs() == context_certifi.get_ca_certs():
+ pytest.skip('System uses certifi as default. The test is not valid')
+
+ @pytest.mark.parametrize('method,status,expected', [
+ ('GET', 303, 'GET'),
+ ('HEAD', 303, 'HEAD'),
+ ('PUT', 303, 'GET'),
+ ('POST', 301, 'GET'),
+ ('HEAD', 301, 'HEAD'),
+ ('POST', 302, 'GET'),
+ ('HEAD', 302, 'HEAD'),
+ ('PUT', 302, 'PUT'),
+ ('POST', 308, 'POST'),
+ ('POST', 307, 'POST'),
+ ('HEAD', 308, 'HEAD'),
+ ('HEAD', 307, 'HEAD'),
+ ])
+ def test_get_redirect_method(self, method, status, expected):
+ assert get_redirect_method(method, status) == expected
+
+ @pytest.mark.parametrize('headers,supported_encodings,expected', [
+ ({'Accept-Encoding': 'br'}, ['gzip', 'br'], {'Accept-Encoding': 'br'}),
+ ({}, ['gzip', 'br'], {'Accept-Encoding': 'gzip, br'}),
+ ({'Content-type': 'application/json'}, [], {'Content-type': 'application/json', 'Accept-Encoding': 'identity'}),
+ ])
+ def test_add_accept_encoding_header(self, headers, supported_encodings, expected):
+ headers = HTTPHeaderDict(headers)
+ add_accept_encoding_header(headers, supported_encodings)
+ assert headers == HTTPHeaderDict(expected)
+
+
+class TestInstanceStoreMixin:
+
+ class FakeInstanceStoreMixin(InstanceStoreMixin):
+ def _create_instance(self, **kwargs):
+ return random.randint(0, 1000000)
+
+ def _close_instance(self, instance):
+ pass
+
+ def test_mixin(self):
+ mixin = self.FakeInstanceStoreMixin()
+ assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) == mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
+
+ assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'e', 4}}) != mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
+
+ assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}} != mixin._get_instance(d={'a': 1, 'b': 2, 'g': {'d', 4}}))
+
+ assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) == mixin._get_instance(d={'a': 1}, e=[1, 2, 3])
+
+ assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) != mixin._get_instance(d={'a': 1}, e=[1, 2, 3, 4])
+
+ cookiejar = YoutubeDLCookieJar()
+ assert mixin._get_instance(b=[1, 2], c=cookiejar) == mixin._get_instance(b=[1, 2], c=cookiejar)
+
+ assert mixin._get_instance(b=[1, 2], c=cookiejar) != mixin._get_instance(b=[1, 2], c=YoutubeDLCookieJar())
+
+ # Different order
+ assert mixin._get_instance(c=cookiejar, b=[1, 2]) == mixin._get_instance(b=[1, 2], c=cookiejar)
+
+ m = mixin._get_instance(t=1234)
+ assert mixin._get_instance(t=1234) == m
+ mixin._clear_instances()
+ assert mixin._get_instance(t=1234) != m
+
+
+class TestNetworkingExceptions:
+
+ @staticmethod
+ def create_response(status):
+ return Response(fp=io.BytesIO(b'test'), url='http://example.com', headers={'tesT': 'test'}, status=status)
+
+ @pytest.mark.parametrize('http_error_class', [HTTPError, lambda r: _CompatHTTPError(HTTPError(r))])
+ def test_http_error(self, http_error_class):
+
+ response = self.create_response(403)
+ error = http_error_class(response)
+
+ assert error.status == 403
+ assert str(error) == error.msg == 'HTTP Error 403: Forbidden'
+ assert error.reason == response.reason
+ assert error.response is response
+
+ data = error.response.read()
+ assert data == b'test'
+ assert repr(error) == '<HTTPError 403: Forbidden>'
+
+ @pytest.mark.parametrize('http_error_class', [HTTPError, lambda *args, **kwargs: _CompatHTTPError(HTTPError(*args, **kwargs))])
+ def test_redirect_http_error(self, http_error_class):
+ response = self.create_response(301)
+ error = http_error_class(response, redirect_loop=True)
+ assert str(error) == error.msg == 'HTTP Error 301: Moved Permanently (redirect loop detected)'
+ assert error.reason == 'Moved Permanently'
+
+ def test_compat_http_error(self):
+ response = self.create_response(403)
+ error = _CompatHTTPError(HTTPError(response))
+ assert isinstance(error, HTTPError)
+ assert isinstance(error, urllib.error.HTTPError)
+
+ @contextlib.contextmanager
+ def raises_deprecation_warning():
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter('always')
+ yield
+
+ if len(w) == 0:
+ pytest.fail('Did not raise DeprecationWarning')
+ if len(w) > 1:
+ pytest.fail(f'Raised multiple warnings: {w}')
+
+ if not issubclass(w[-1].category, DeprecationWarning):
+ pytest.fail(f'Expected DeprecationWarning, got {w[-1].category}')
+ w.clear()
+
+ with raises_deprecation_warning():
+ assert error.code == 403
+
+ with raises_deprecation_warning():
+ assert error.getcode() == 403
+
+ with raises_deprecation_warning():
+ assert error.hdrs is error.response.headers
+
+ with raises_deprecation_warning():
+ assert error.info() is error.response.headers
+
+ with raises_deprecation_warning():
+ assert error.headers is error.response.headers
+
+ with raises_deprecation_warning():
+ assert error.filename == error.response.url
+
+ with raises_deprecation_warning():
+ assert error.url == error.response.url
+
+ with raises_deprecation_warning():
+ assert error.geturl() == error.response.url
+
+ # Passthrough file operations
+ with raises_deprecation_warning():
+ assert error.read() == b'test'
+
+ with raises_deprecation_warning():
+ assert not error.closed
+
+ with raises_deprecation_warning():
+ # Technically Response operations are also passed through, which should not be used.
+ assert error.get_header('test') == 'test'
+
+ # Should not raise a warning
+ error.close()
+
+ @pytest.mark.skipif(
+ platform.python_implementation() == 'PyPy', reason='garbage collector works differently in pypy')
+ def test_compat_http_error_autoclose(self):
+ # Compat HTTPError should not autoclose response
+ response = self.create_response(403)
+ _CompatHTTPError(HTTPError(response))
+ assert not response.closed
+
+ def test_incomplete_read_error(self):
+ error = IncompleteRead(b'test', 3, cause='test')
+ assert isinstance(error, IncompleteRead)
+ assert repr(error) == '<IncompleteRead: 4 bytes read, 3 more expected>'
+ assert str(error) == error.msg == '4 bytes read, 3 more expected'
+ assert error.partial == b'test'
+ assert error.expected == 3
+ assert error.cause == 'test'
+
+ error = IncompleteRead(b'aaa')
+ assert repr(error) == '<IncompleteRead: 3 bytes read>'
+ assert str(error) == '3 bytes read'
diff --git a/test/test_plugins.py b/test/test_plugins.py
new file mode 100644
index 000000000..6cde579e1
--- /dev/null
+++ b/test/test_plugins.py
@@ -0,0 +1,73 @@
+import importlib
+import os
+import shutil
+import sys
+import unittest
+from pathlib import Path
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+TEST_DATA_DIR = Path(os.path.dirname(os.path.abspath(__file__)), 'testdata')
+sys.path.append(str(TEST_DATA_DIR))
+importlib.invalidate_caches()
+
+from yt_dlp.plugins import PACKAGE_NAME, directories, load_plugins
+
+
+class TestPlugins(unittest.TestCase):
+
+ TEST_PLUGIN_DIR = TEST_DATA_DIR / PACKAGE_NAME
+
+ def test_directories_containing_plugins(self):
+ self.assertIn(self.TEST_PLUGIN_DIR, map(Path, directories()))
+
+ def test_extractor_classes(self):
+ for module_name in tuple(sys.modules):
+ if module_name.startswith(f'{PACKAGE_NAME}.extractor'):
+ del sys.modules[module_name]
+ plugins_ie = load_plugins('extractor', 'IE')
+
+ self.assertIn(f'{PACKAGE_NAME}.extractor.normal', sys.modules.keys())
+ self.assertIn('NormalPluginIE', plugins_ie.keys())
+
+ # don't load modules with underscore prefix
+ self.assertFalse(
+ f'{PACKAGE_NAME}.extractor._ignore' in sys.modules.keys(),
+ 'loaded module beginning with underscore')
+ self.assertNotIn('IgnorePluginIE', plugins_ie.keys())
+
+ # Don't load extractors with underscore prefix
+ self.assertNotIn('_IgnoreUnderscorePluginIE', plugins_ie.keys())
+
+ # Don't load extractors not specified in __all__ (if supplied)
+ self.assertNotIn('IgnoreNotInAllPluginIE', plugins_ie.keys())
+ self.assertIn('InAllPluginIE', plugins_ie.keys())
+
+ def test_postprocessor_classes(self):
+ plugins_pp = load_plugins('postprocessor', 'PP')
+ self.assertIn('NormalPluginPP', plugins_pp.keys())
+
+ def test_importing_zipped_module(self):
+ zip_path = TEST_DATA_DIR / 'zipped_plugins.zip'
+ shutil.make_archive(str(zip_path)[:-4], 'zip', str(zip_path)[:-4])
+ sys.path.append(str(zip_path)) # add zip to search paths
+ importlib.invalidate_caches() # reset the import caches
+
+ try:
+ for plugin_type in ('extractor', 'postprocessor'):
+ package = importlib.import_module(f'{PACKAGE_NAME}.{plugin_type}')
+ self.assertIn(zip_path / PACKAGE_NAME / plugin_type, map(Path, package.__path__))
+
+ plugins_ie = load_plugins('extractor', 'IE')
+ self.assertIn('ZippedPluginIE', plugins_ie.keys())
+
+ plugins_pp = load_plugins('postprocessor', 'PP')
+ self.assertIn('ZippedPluginPP', plugins_pp.keys())
+
+ finally:
+ sys.path.remove(str(zip_path))
+ os.remove(zip_path)
+ importlib.invalidate_caches() # reset the import caches
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_socks.py b/test/test_socks.py
index 6651290d2..95ffce275 100644
--- a/test/test_socks.py
+++ b/test/test_socks.py
@@ -1,113 +1,470 @@
#!/usr/bin/env python3
-
# Allow direct execution
import os
import sys
+import threading
import unittest
-sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+import pytest
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+import abc
+import contextlib
+import enum
+import functools
+import http.server
+import json
import random
-import subprocess
-import urllib.request
+import socket
+import struct
+import time
+from socketserver import (
+ BaseRequestHandler,
+ StreamRequestHandler,
+ ThreadingTCPServer,
+)
-from test.helper import FakeYDL, get_params, is_download_test
+from test.helper import http_server_port
+from yt_dlp.networking import Request
+from yt_dlp.networking.exceptions import ProxyError, TransportError
+from yt_dlp.socks import (
+ SOCKS4_REPLY_VERSION,
+ SOCKS4_VERSION,
+ SOCKS5_USER_AUTH_SUCCESS,
+ SOCKS5_USER_AUTH_VERSION,
+ SOCKS5_VERSION,
+ Socks5AddressType,
+ Socks5Auth,
+)
+SOCKS5_USER_AUTH_FAILURE = 0x1
-@is_download_test
-class TestMultipleSocks(unittest.TestCase):
- @staticmethod
- def _check_params(attrs):
- params = get_params()
- for attr in attrs:
- if attr not in params:
- print('Missing %s. Skipping.' % attr)
- return
- return params
- def test_proxy_http(self):
- params = self._check_params(['primary_proxy', 'primary_server_ip'])
- if params is None:
+class Socks4CD(enum.IntEnum):
+ REQUEST_GRANTED = 90
+ REQUEST_REJECTED_OR_FAILED = 91
+ REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
+ REQUEST_REJECTED_DIFFERENT_USERID = 93
+
+
+class Socks5Reply(enum.IntEnum):
+ SUCCEEDED = 0x0
+ GENERAL_FAILURE = 0x1
+ CONNECTION_NOT_ALLOWED = 0x2
+ NETWORK_UNREACHABLE = 0x3
+ HOST_UNREACHABLE = 0x4
+ CONNECTION_REFUSED = 0x5
+ TTL_EXPIRED = 0x6
+ COMMAND_NOT_SUPPORTED = 0x7
+ ADDRESS_TYPE_NOT_SUPPORTED = 0x8
+
+
+class SocksTestRequestHandler(BaseRequestHandler):
+
+ def __init__(self, *args, socks_info=None, **kwargs):
+ self.socks_info = socks_info
+ super().__init__(*args, **kwargs)
+
+
+class SocksProxyHandler(BaseRequestHandler):
+ def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
+ self.socks_kwargs = socks_server_kwargs or {}
+ self.request_handler_class = request_handler_class
+ super().__init__(*args, **kwargs)
+
+
+class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
+
+ # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
+ # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
+
+ def handle(self):
+ sleep = self.socks_kwargs.get('sleep')
+ if sleep:
+ time.sleep(sleep)
+ version, nmethods = self.connection.recv(2)
+ assert version == SOCKS5_VERSION
+ methods = list(self.connection.recv(nmethods))
+
+ auth = self.socks_kwargs.get('auth')
+
+ if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
+ self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
+ self.server.close_request(self.request)
return
- ydl = FakeYDL({
- 'proxy': params['primary_proxy']
- })
- self.assertEqual(
- ydl.urlopen('http://yt-dl.org/ip').read().decode(),
- params['primary_server_ip'])
-
- def test_proxy_https(self):
- params = self._check_params(['primary_proxy', 'primary_server_ip'])
- if params is None:
+
+ elif Socks5Auth.AUTH_USER_PASS in methods:
+ self.connection.sendall(struct.pack("!BB", SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
+
+ _, user_len = struct.unpack('!BB', self.connection.recv(2))
+ username = self.connection.recv(user_len).decode()
+ pass_len = ord(self.connection.recv(1))
+ password = self.connection.recv(pass_len).decode()
+
+ if username == auth[0] and password == auth[1]:
+ self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
+ else:
+ self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
+ self.server.close_request(self.request)
+ return
+
+ elif Socks5Auth.AUTH_NONE in methods:
+ self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
+ else:
+ self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
+ self.server.close_request(self.request)
return
- ydl = FakeYDL({
- 'proxy': params['primary_proxy']
- })
- self.assertEqual(
- ydl.urlopen('https://yt-dl.org/ip').read().decode(),
- params['primary_server_ip'])
-
- def test_secondary_proxy_http(self):
- params = self._check_params(['secondary_proxy', 'secondary_server_ip'])
- if params is None:
+
+ version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
+ socks_info = {
+ 'version': version,
+ 'auth_methods': methods,
+ 'command': command,
+ 'client_address': self.client_address,
+ 'ipv4_address': None,
+ 'domain_address': None,
+ 'ipv6_address': None,
+ }
+ if address_type == Socks5AddressType.ATYP_IPV4:
+ socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
+ elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
+ socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
+ elif address_type == Socks5AddressType.ATYP_IPV6:
+ socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
+ else:
+ self.server.close_request(self.request)
+
+ socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
+
+ # dummy response, the returned IP is just a placeholder
+ self.connection.sendall(struct.pack(
+ '!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
+
+ self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
+
+
+class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
+
+ # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
+ # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
+
+ def _read_until_null(self):
+ return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
+
+ def handle(self):
+ sleep = self.socks_kwargs.get('sleep')
+ if sleep:
+ time.sleep(sleep)
+ socks_info = {
+ 'version': SOCKS4_VERSION,
+ 'command': None,
+ 'client_address': self.client_address,
+ 'ipv4_address': None,
+ 'port': None,
+ 'domain_address': None,
+ }
+ version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
+ socks_info['port'] = dest_port
+ socks_info['command'] = command
+ if version != SOCKS4_VERSION:
+ self.server.close_request(self.request)
return
- ydl = FakeYDL()
- req = urllib.request.Request('http://yt-dl.org/ip')
- req.add_header('Ytdl-request-proxy', params['secondary_proxy'])
- self.assertEqual(
- ydl.urlopen(req).read().decode(),
- params['secondary_server_ip'])
-
- def test_secondary_proxy_https(self):
- params = self._check_params(['secondary_proxy', 'secondary_server_ip'])
- if params is None:
+ use_remote_dns = False
+ if 0x0 < dest_ip <= 0xFF:
+ use_remote_dns = True
+ else:
+ socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack("!I", dest_ip))
+
+ user_id = self._read_until_null().decode()
+ if user_id != (self.socks_kwargs.get('user_id') or ''):
+ self.connection.sendall(struct.pack(
+ '!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
+ self.server.close_request(self.request)
return
- ydl = FakeYDL()
- req = urllib.request.Request('https://yt-dl.org/ip')
- req.add_header('Ytdl-request-proxy', params['secondary_proxy'])
- self.assertEqual(
- ydl.urlopen(req).read().decode(),
- params['secondary_server_ip'])
+ if use_remote_dns:
+ socks_info['domain_address'] = self._read_until_null().decode()
-@is_download_test
-class TestSocks(unittest.TestCase):
- _SKIP_SOCKS_TEST = True
+ # dummy response, the returned IP is just a placeholder
+ self.connection.sendall(
+ struct.pack(
+ '!BBHI', SOCKS4_REPLY_VERSION,
+ self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
- def setUp(self):
- if self._SKIP_SOCKS_TEST:
- return
+ self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
- self.port = random.randint(20000, 30000)
- self.server_process = subprocess.Popen([
- 'srelay', '-f', '-i', '127.0.0.1:%d' % self.port],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- def tearDown(self):
- if self._SKIP_SOCKS_TEST:
- return
+class IPv6ThreadingTCPServer(ThreadingTCPServer):
+ address_family = socket.AF_INET6
+
+
+class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
+ def do_GET(self):
+ if self.path == '/socks_info':
+ payload = json.dumps(self.socks_info.copy())
+ self.send_response(200)
+ self.send_header('Content-Type', 'application/json; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload.encode())
+
+
+@contextlib.contextmanager
+def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
+ server = server_thread = None
+ try:
+ bind_address = bind_ip or '127.0.0.1'
+ server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
+ server = server_type(
+ (bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
+ server_port = http_server_port(server)
+ server_thread = threading.Thread(target=server.serve_forever)
+ server_thread.daemon = True
+ server_thread.start()
+ if '.' not in bind_address:
+ yield f'[{bind_address}]:{server_port}'
+ else:
+ yield f'{bind_address}:{server_port}'
+ finally:
+ server.shutdown()
+ server.server_close()
+ server_thread.join(2.0)
+
+
+class SocksProxyTestContext(abc.ABC):
+ REQUEST_HANDLER_CLASS = None
+
+ def socks_server(self, server_class, *args, **kwargs):
+ return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
+
+ @abc.abstractmethod
+ def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
+ """return a dict of socks_info"""
+
+
+class HTTPSocksTestProxyContext(SocksProxyTestContext):
+ REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
+
+ def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
+ request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
+ handler.validate(request)
+ return json.loads(handler.send(request).read().decode())
+
+
+CTX_MAP = {
+ 'http': HTTPSocksTestProxyContext,
+}
+
+
+@pytest.fixture(scope='module')
+def ctx(request):
+ return CTX_MAP[request.param]()
+
+
+class TestSocks4Proxy:
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks4_no_auth(self, handler, ctx):
+ with handler() as rh:
+ with ctx.socks_server(Socks4ProxyHandler) as server_address:
+ response = ctx.socks_info_request(
+ rh, proxies={'all': f'socks4://{server_address}'})
+ assert response['version'] == 4
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks4_auth(self, handler, ctx):
+ with handler() as rh:
+ with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
+ with pytest.raises(ProxyError):
+ ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
+ response = ctx.socks_info_request(
+ rh, proxies={'all': f'socks4://user:@{server_address}'})
+ assert response['version'] == 4
+
+ @pytest.mark.parametrize('handler,ctx', [
+ pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
+ reason='socks4a implementation currently broken when destination is not a domain name'))
+ ], indirect=True)
+ def test_socks4a_ipv4_target(self, handler, ctx):
+ with ctx.socks_server(Socks4ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
+ assert response['version'] == 4
+ assert response['ipv4_address'] == '127.0.0.1'
+ assert response['domain_address'] is None
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks4a_domain_target(self, handler, ctx):
+ with ctx.socks_server(Socks4ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='localhost')
+ assert response['version'] == 4
+ assert response['ipv4_address'] is None
+ assert response['domain_address'] == 'localhost'
+
+ @pytest.mark.parametrize('handler,ctx', [
+ pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
+ reason='source_address is not yet supported for socks4 proxies'))
+ ], indirect=True)
+ def test_ipv4_client_source_address(self, handler, ctx):
+ with ctx.socks_server(Socks4ProxyHandler) as server_address:
+ source_address = f'127.0.0.{random.randint(5, 255)}'
+ with handler(proxies={'all': f'socks4://{server_address}'},
+ source_address=source_address) as rh:
+ response = ctx.socks_info_request(rh)
+ assert response['client_address'][0] == source_address
+ assert response['version'] == 4
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ @pytest.mark.parametrize('reply_code', [
+ Socks4CD.REQUEST_REJECTED_OR_FAILED,
+ Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
+ Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
+ ])
+ def test_socks4_errors(self, handler, ctx, reply_code):
+ with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
+ with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
+ with pytest.raises(ProxyError):
+ ctx.socks_info_request(rh)
+
+ @pytest.mark.parametrize('handler,ctx', [
+ pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
+ reason='IPv6 socks4 proxies are not yet supported'))
+ ], indirect=True)
+ def test_ipv6_socks4_proxy(self, handler, ctx):
+ with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
+ with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
+ assert response['client_address'][0] == '::1'
+ assert response['ipv4_address'] == '127.0.0.1'
+ assert response['version'] == 4
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_timeout(self, handler, ctx):
+ with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
+ with handler(proxies={'all': f'socks4://{server_address}'}, timeout=1) as rh:
+ with pytest.raises(TransportError):
+ ctx.socks_info_request(rh)
+
+
+class TestSocks5Proxy:
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks5_no_auth(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh)
+ assert response['auth_methods'] == [0x0]
+ assert response['version'] == 5
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks5_user_pass(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
+ with handler() as rh:
+ with pytest.raises(ProxyError):
+ ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
+
+ response = ctx.socks_info_request(
+ rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
+
+ assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
+ assert response['version'] == 5
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks5_ipv4_target(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
+ assert response['ipv4_address'] == '127.0.0.1'
+ assert response['version'] == 5
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks5_domain_target(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='localhost')
+ assert response['ipv4_address'] == '127.0.0.1'
+ assert response['version'] == 5
+
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks5h_domain_target(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='localhost')
+ assert response['ipv4_address'] is None
+ assert response['domain_address'] == 'localhost'
+ assert response['version'] == 5
- self.server_process.terminate()
- self.server_process.communicate()
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_socks5h_ip_target(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
+ assert response['ipv4_address'] == '127.0.0.1'
+ assert response['domain_address'] is None
+ assert response['version'] == 5
- def _get_ip(self, protocol):
- if self._SKIP_SOCKS_TEST:
- return '127.0.0.1'
+ @pytest.mark.parametrize('handler,ctx', [
+ pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
+ reason='IPv6 destination addresses are not yet supported'))
+ ], indirect=True)
+ def test_socks5_ipv6_destination(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='[::1]')
+ assert response['ipv6_address'] == '::1'
+ assert response['port'] == 80
+ assert response['version'] == 5
- ydl = FakeYDL({
- 'proxy': '%s://127.0.0.1:%d' % (protocol, self.port),
- })
- return ydl.urlopen('http://yt-dl.org/ip').read().decode()
+ @pytest.mark.parametrize('handler,ctx', [
+ pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
+ reason='IPv6 socks5 proxies are not yet supported'))
+ ], indirect=True)
+ def test_ipv6_socks5_proxy(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
+ response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
+ assert response['client_address'][0] == '::1'
+ assert response['ipv4_address'] == '127.0.0.1'
+ assert response['version'] == 5
- def test_socks4(self):
- self.assertTrue(isinstance(self._get_ip('socks4'), str))
+ # XXX: is there any feasible way of testing IPv6 source addresses?
+ # Same would go for non-proxy source_address test...
+ @pytest.mark.parametrize('handler,ctx', [
+ pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
+ reason='source_address is not yet supported for socks5 proxies'))
+ ], indirect=True)
+ def test_ipv4_client_source_address(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler) as server_address:
+ source_address = f'127.0.0.{random.randint(5, 255)}'
+ with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
+ response = ctx.socks_info_request(rh)
+ assert response['client_address'][0] == source_address
+ assert response['version'] == 5
- def test_socks4a(self):
- self.assertTrue(isinstance(self._get_ip('socks4a'), str))
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ @pytest.mark.parametrize('reply_code', [
+ Socks5Reply.GENERAL_FAILURE,
+ Socks5Reply.CONNECTION_NOT_ALLOWED,
+ Socks5Reply.NETWORK_UNREACHABLE,
+ Socks5Reply.HOST_UNREACHABLE,
+ Socks5Reply.CONNECTION_REFUSED,
+ Socks5Reply.TTL_EXPIRED,
+ Socks5Reply.COMMAND_NOT_SUPPORTED,
+ Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
+ ])
+ def test_socks5_errors(self, handler, ctx, reply_code):
+ with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
+ with pytest.raises(ProxyError):
+ ctx.socks_info_request(rh)
- def test_socks5(self):
- self.assertTrue(isinstance(self._get_ip('socks5'), str))
+ @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
+ def test_timeout(self, handler, ctx):
+ with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
+ with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
+ with pytest.raises(TransportError):
+ ctx.socks_info_request(rh)
if __name__ == '__main__':
diff --git a/test/test_utils.py b/test/test_utils.py
index 49ab3796b..91e3ffd39 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -5,6 +5,7 @@ import os
import re
import sys
import unittest
+import warnings
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -46,10 +47,9 @@ from yt_dlp.utils import (
encode_base_n,
encode_compat_str,
encodeFilename,
- escape_rfc3986,
- escape_url,
expand_path,
extract_attributes,
+ extract_basic_auth,
find_xpath_attr,
fix_xml_ampersands,
float_or_none,
@@ -102,15 +102,16 @@ from yt_dlp.utils import (
sanitize_filename,
sanitize_path,
sanitize_url,
- sanitized_Request,
shell_quote,
smuggle_url,
+ str_or_none,
str_to_int,
strip_jsonp,
strip_or_none,
subtitles_filename,
timeconvert,
traverse_obj,
+ try_call,
unescapeHTML,
unified_strdate,
unified_timestamp,
@@ -122,12 +123,19 @@ from yt_dlp.utils import (
urlencode_postdata,
urljoin,
urshift,
+ variadic,
version_tuple,
xpath_attr,
xpath_element,
xpath_text,
xpath_with_ns,
)
+from yt_dlp.utils.networking import (
+ HTTPHeaderDict,
+ escape_rfc3986,
+ normalize_url,
+ remove_dot_segments,
+)
class TestUtil(unittest.TestCase):
@@ -254,15 +262,6 @@ class TestUtil(unittest.TestCase):
self.assertEqual(sanitize_url('https://foo.bar'), 'https://foo.bar')
self.assertEqual(sanitize_url('foo bar'), 'foo bar')
- def test_extract_basic_auth(self):
- auth_header = lambda url: sanitized_Request(url).get_header('Authorization')
- self.assertFalse(auth_header('http://foo.bar'))
- self.assertFalse(auth_header('http://:foo.bar'))
- self.assertEqual(auth_header('http://@foo.bar'), 'Basic Og==')
- self.assertEqual(auth_header('http://:pass@foo.bar'), 'Basic OnBhc3M=')
- self.assertEqual(auth_header('http://user:@foo.bar'), 'Basic dXNlcjo=')
- self.assertEqual(auth_header('http://user:pass@foo.bar'), 'Basic dXNlcjpwYXNz')
-
def test_expand_path(self):
def env(var):
return f'%{var}%' if sys.platform == 'win32' else f'${var}'
@@ -659,6 +658,8 @@ class TestUtil(unittest.TestCase):
self.assertEqual(parse_duration('P0Y0M0DT0H4M20.880S'), 260.88)
self.assertEqual(parse_duration('01:02:03:050'), 3723.05)
self.assertEqual(parse_duration('103:050'), 103.05)
+ self.assertEqual(parse_duration('1HR 3MIN'), 3780)
+ self.assertEqual(parse_duration('2hrs 3mins'), 7380)
def test_fix_xml_ampersands(self):
self.assertEqual(
@@ -935,24 +936,124 @@ class TestUtil(unittest.TestCase):
self.assertEqual(escape_rfc3986('foo bar'), 'foo%20bar')
self.assertEqual(escape_rfc3986('foo%20bar'), 'foo%20bar')
- def test_escape_url(self):
+ def test_normalize_url(self):
self.assertEqual(
- escape_url('http://wowza.imust.org/srv/vod/telemb/new/UPLOAD/UPLOAD/20224_IncendieHavré_FD.mp4'),
+ normalize_url('http://wowza.imust.org/srv/vod/telemb/new/UPLOAD/UPLOAD/20224_IncendieHavré_FD.mp4'),
'http://wowza.imust.org/srv/vod/telemb/new/UPLOAD/UPLOAD/20224_IncendieHavre%CC%81_FD.mp4'
)
self.assertEqual(
- escape_url('http://www.ardmediathek.de/tv/Sturm-der-Liebe/Folge-2036-Zu-Mann-und-Frau-erklärt/Das-Erste/Video?documentId=22673108&bcastId=5290'),
+ normalize_url('http://www.ardmediathek.de/tv/Sturm-der-Liebe/Folge-2036-Zu-Mann-und-Frau-erklärt/Das-Erste/Video?documentId=22673108&bcastId=5290'),
'http://www.ardmediathek.de/tv/Sturm-der-Liebe/Folge-2036-Zu-Mann-und-Frau-erkl%C3%A4rt/Das-Erste/Video?documentId=22673108&bcastId=5290'
)
self.assertEqual(
- escape_url('http://тест.рф/фрагмент'),
+ normalize_url('http://тест.рф/фрагмент'),
'http://xn--e1aybc.xn--p1ai/%D1%84%D1%80%D0%B0%D0%B3%D0%BC%D0%B5%D0%BD%D1%82'
)
self.assertEqual(
- escape_url('http://тест.рф/абв?абв=абв#абв'),
+ normalize_url('http://тест.рф/абв?абв=абв#абв'),
'http://xn--e1aybc.xn--p1ai/%D0%B0%D0%B1%D0%B2?%D0%B0%D0%B1%D0%B2=%D0%B0%D0%B1%D0%B2#%D0%B0%D0%B1%D0%B2'
)
- self.assertEqual(escape_url('http://vimeo.com/56015672#at=0'), 'http://vimeo.com/56015672#at=0')
+ self.assertEqual(normalize_url('http://vimeo.com/56015672#at=0'), 'http://vimeo.com/56015672#at=0')
+
+ self.assertEqual(normalize_url('http://www.example.com/../a/b/../c/./d.html'), 'http://www.example.com/a/c/d.html')
+
+ def test_remove_dot_segments(self):
+ self.assertEqual(remove_dot_segments('/a/b/c/./../../g'), '/a/g')
+ self.assertEqual(remove_dot_segments('mid/content=5/../6'), 'mid/6')
+ self.assertEqual(remove_dot_segments('/ad/../cd'), '/cd')
+ self.assertEqual(remove_dot_segments('/ad/../cd/'), '/cd/')
+ self.assertEqual(remove_dot_segments('/..'), '/')
+ self.assertEqual(remove_dot_segments('/./'), '/')
+ self.assertEqual(remove_dot_segments('/./a'), '/a')
+ self.assertEqual(remove_dot_segments('/abc/./.././d/././e/.././f/./../../ghi'), '/ghi')
+ self.assertEqual(remove_dot_segments('/'), '/')
+ self.assertEqual(remove_dot_segments('/t'), '/t')
+ self.assertEqual(remove_dot_segments('t'), 't')
+ self.assertEqual(remove_dot_segments(''), '')
+ self.assertEqual(remove_dot_segments('/../a/b/c'), '/a/b/c')
+ self.assertEqual(remove_dot_segments('../a'), 'a')
+ self.assertEqual(remove_dot_segments('./a'), 'a')
+ self.assertEqual(remove_dot_segments('.'), '')
+ self.assertEqual(remove_dot_segments('////'), '////')
+
+ def test_js_to_json_vars_strings(self):
+ self.assertDictEqual(
+ json.loads(js_to_json(
+ '''{
+ 'null': a,
+ 'nullStr': b,
+ 'true': c,
+ 'trueStr': d,
+ 'false': e,
+ 'falseStr': f,
+ 'unresolvedVar': g,
+ }''',
+ {
+ 'a': 'null',
+ 'b': '"null"',
+ 'c': 'true',
+ 'd': '"true"',
+ 'e': 'false',
+ 'f': '"false"',
+ 'g': 'var',
+ }
+ )),
+ {
+ 'null': None,
+ 'nullStr': 'null',
+ 'true': True,
+ 'trueStr': 'true',
+ 'false': False,
+ 'falseStr': 'false',
+ 'unresolvedVar': 'var'
+ }
+ )
+
+ self.assertDictEqual(
+ json.loads(js_to_json(
+ '''{
+ 'int': a,
+ 'intStr': b,
+ 'float': c,
+ 'floatStr': d,
+ }''',
+ {
+ 'a': '123',
+ 'b': '"123"',
+ 'c': '1.23',
+ 'd': '"1.23"',
+ }
+ )),
+ {
+ 'int': 123,
+ 'intStr': '123',
+ 'float': 1.23,
+ 'floatStr': '1.23',
+ }
+ )
+
+ self.assertDictEqual(
+ json.loads(js_to_json(
+ '''{
+ 'object': a,
+ 'objectStr': b,
+ 'array': c,
+ 'arrayStr': d,
+ }''',
+ {
+ 'a': '{}',
+ 'b': '"{}"',
+ 'c': '[]',
+ 'd': '"[]"',
+ }
+ )),
+ {
+ 'object': {},
+ 'objectStr': '{}',
+ 'array': [],
+ 'arrayStr': '[]',
+ }
+ )
def test_js_to_json_realworld(self):
inp = '''{
@@ -1110,6 +1211,13 @@ class TestUtil(unittest.TestCase):
self.assertEqual(js_to_json('42a1'), '42"a1"')
self.assertEqual(js_to_json('42a-1'), '42"a"-1')
+ def test_js_to_json_template_literal(self):
+ self.assertEqual(js_to_json('`Hello ${name}`', {'name': '"world"'}), '"Hello world"')
+ self.assertEqual(js_to_json('`${name}${name}`', {'name': '"X"'}), '"XX"')
+ self.assertEqual(js_to_json('`${name}${name}`', {'name': '5'}), '"55"')
+ self.assertEqual(js_to_json('`${name}"${name}"`', {'name': '5'}), '"5\\"5\\""')
+ self.assertEqual(js_to_json('`${name}`', {}), '"name"')
+
def test_extract_attributes(self):
self.assertEqual(extract_attributes('<e x="y">'), {'x': 'y'})
self.assertEqual(extract_attributes("<e x='y'>"), {'x': 'y'})
@@ -1745,6 +1853,8 @@ Line 1
def test_clean_podcast_url(self):
self.assertEqual(clean_podcast_url('https://www.podtrac.com/pts/redirect.mp3/chtbl.com/track/5899E/traffic.megaphone.fm/HSW7835899191.mp3'), 'https://traffic.megaphone.fm/HSW7835899191.mp3')
self.assertEqual(clean_podcast_url('https://play.podtrac.com/npr-344098539/edge1.pod.npr.org/anon.npr-podcasts/podcast/npr/waitwait/2020/10/20201003_waitwait_wwdtmpodcast201003-015621a5-f035-4eca-a9a1-7c118d90bc3c.mp3'), 'https://edge1.pod.npr.org/anon.npr-podcasts/podcast/npr/waitwait/2020/10/20201003_waitwait_wwdtmpodcast201003-015621a5-f035-4eca-a9a1-7c118d90bc3c.mp3')
+ self.assertEqual(clean_podcast_url('https://pdst.fm/e/2.gum.fm/chtbl.com/track/chrt.fm/track/34D33/pscrb.fm/rss/p/traffic.megaphone.fm/ITLLC7765286967.mp3?updated=1687282661'), 'https://traffic.megaphone.fm/ITLLC7765286967.mp3?updated=1687282661')
+ self.assertEqual(clean_podcast_url('https://pdst.fm/e/https://mgln.ai/e/441/www.buzzsprout.com/1121972/13019085-ep-252-the-deep-life-stack.mp3'), 'https://www.buzzsprout.com/1121972/13019085-ep-252-the-deep-life-stack.mp3')
def test_LazyList(self):
it = list(range(10))
@@ -1874,6 +1984,8 @@ Line 1
vcodecs=[None], acodecs=[None], vexts=['webm'], aexts=['m4a']), 'mkv')
self.assertEqual(get_compatible_ext(
vcodecs=[None], acodecs=[None], vexts=['webm'], aexts=['webm']), 'webm')
+ self.assertEqual(get_compatible_ext(
+ vcodecs=[None], acodecs=[None], vexts=['webm'], aexts=['weba']), 'webm')
self.assertEqual(get_compatible_ext(
vcodecs=['h264'], acodecs=['mp4a'], vexts=['mov'], aexts=['m4a']), 'mp4')
@@ -1885,6 +1997,35 @@ Line 1
self.assertEqual(get_compatible_ext(
vcodecs=['av1'], acodecs=['mp4a'], vexts=['webm'], aexts=['m4a'], preferences=('webm', 'mkv')), 'mkv')
+ def test_try_call(self):
+ def total(*x, **kwargs):
+ return sum(x) + sum(kwargs.values())
+
+ self.assertEqual(try_call(None), None,
+ msg='not a fn should give None')
+ self.assertEqual(try_call(lambda: 1), 1,
+ msg='int fn with no expected_type should give int')
+ self.assertEqual(try_call(lambda: 1, expected_type=int), 1,
+ msg='int fn with expected_type int should give int')
+ self.assertEqual(try_call(lambda: 1, expected_type=dict), None,
+ msg='int fn with wrong expected_type should give None')
+ self.assertEqual(try_call(total, args=(0, 1, 0, ), expected_type=int), 1,
+ msg='fn should accept arglist')
+ self.assertEqual(try_call(total, kwargs={'a': 0, 'b': 1, 'c': 0}, expected_type=int), 1,
+ msg='fn should accept kwargs')
+ self.assertEqual(try_call(lambda: 1, expected_type=dict), None,
+ msg='int fn with no expected_type should give None')
+ self.assertEqual(try_call(lambda x: {}, total, args=(42, ), expected_type=int), 42,
+ msg='expect first int result with expected_type int')
+
+ def test_variadic(self):
+ self.assertEqual(variadic(None), (None, ))
+ self.assertEqual(variadic('spam'), ('spam', ))
+ self.assertEqual(variadic('spam', allowed_types=dict), 'spam')
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore')
+ self.assertEqual(variadic('spam', allowed_types=[dict]), 'spam')
+
def test_traverse_obj(self):
_TEST_DATA = {
100: 100,
@@ -1918,8 +2059,8 @@ Line 1
# Test Ellipsis behavior
self.assertCountEqual(traverse_obj(_TEST_DATA, ...),
- (item for item in _TEST_DATA.values() if item is not None),
- msg='`...` should give all values except `None`')
+ (item for item in _TEST_DATA.values() if item not in (None, {})),
+ msg='`...` should give all non discarded values')
self.assertCountEqual(traverse_obj(_TEST_DATA, ('urls', 0, ...)), _TEST_DATA['urls'][0].values(),
msg='`...` selection for dicts should select all values')
self.assertEqual(traverse_obj(_TEST_DATA, (..., ..., 'url')),
@@ -1927,6 +2068,8 @@ Line 1
msg='nested `...` queries should work')
self.assertCountEqual(traverse_obj(_TEST_DATA, (..., ..., 'index')), range(4),
msg='`...` query result should be flattened')
+ self.assertEqual(traverse_obj(iter(range(4)), ...), list(range(4)),
+ msg='`...` should accept iterables')
# Test function as key
self.assertEqual(traverse_obj(_TEST_DATA, lambda x, y: x == 'urls' and isinstance(y, list)),
@@ -1934,6 +2077,42 @@ Line 1
msg='function as query key should perform a filter based on (key, value)')
self.assertCountEqual(traverse_obj(_TEST_DATA, lambda _, x: isinstance(x[0], str)), {'str'},
msg='exceptions in the query function should be catched')
+ self.assertEqual(traverse_obj(iter(range(4)), lambda _, x: x % 2 == 0), [0, 2],
+ msg='function key should accept iterables')
+ if __debug__:
+ with self.assertRaises(Exception, msg='Wrong function signature should raise in debug'):
+ traverse_obj(_TEST_DATA, lambda a: ...)
+ with self.assertRaises(Exception, msg='Wrong function signature should raise in debug'):
+ traverse_obj(_TEST_DATA, lambda a, b, c: ...)
+
+ # Test set as key (transformation/type, like `expected_type`)
+ self.assertEqual(traverse_obj(_TEST_DATA, (..., {str.upper}, )), ['STR'],
+ msg='Function in set should be a transformation')
+ self.assertEqual(traverse_obj(_TEST_DATA, (..., {str})), ['str'],
+ msg='Type in set should be a type filter')
+ self.assertEqual(traverse_obj(_TEST_DATA, {dict}), _TEST_DATA,
+ msg='A single set should be wrapped into a path')
+ self.assertEqual(traverse_obj(_TEST_DATA, (..., {str.upper})), ['STR'],
+ msg='Transformation function should not raise')
+ self.assertEqual(traverse_obj(_TEST_DATA, (..., {str_or_none})),
+ [item for item in map(str_or_none, _TEST_DATA.values()) if item is not None],
+ msg='Function in set should be a transformation')
+ if __debug__:
+ with self.assertRaises(Exception, msg='Sets with length != 1 should raise in debug'):
+ traverse_obj(_TEST_DATA, set())
+ with self.assertRaises(Exception, msg='Sets with length != 1 should raise in debug'):
+ traverse_obj(_TEST_DATA, {str.upper, str})
+
+ # Test `slice` as a key
+ _SLICE_DATA = [0, 1, 2, 3, 4]
+ self.assertEqual(traverse_obj(_TEST_DATA, ('dict', slice(1))), None,
+ msg='slice on a dictionary should not throw')
+ self.assertEqual(traverse_obj(_SLICE_DATA, slice(1)), _SLICE_DATA[:1],
+ msg='slice key should apply slice to sequence')
+ self.assertEqual(traverse_obj(_SLICE_DATA, slice(1, 2)), _SLICE_DATA[1:2],
+ msg='slice key should apply slice to sequence')
+ self.assertEqual(traverse_obj(_SLICE_DATA, slice(1, 4, 2)), _SLICE_DATA[1:4:2],
+ msg='slice key should apply slice to sequence')
# Test alternative paths
self.assertEqual(traverse_obj(_TEST_DATA, 'fail', 'str'), 'str',
@@ -1979,15 +2158,23 @@ Line 1
{0: ['https://www.example.com/1', 'https://www.example.com/0']},
msg='tripple nesting in dict path should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'fail'}), {},
- msg='remove `None` values when dict key')
+ msg='remove `None` values when top level dict key fails')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'fail'}, default=...), {0: ...},
- msg='do not remove `None` values if `default`')
- self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}), {0: {}},
- msg='do not remove empty values when dict key')
- self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}, default=...), {0: {}},
- msg='do not remove empty values when dict key and a default')
- self.assertEqual(traverse_obj(_TEST_DATA, {0: ('dict', ...)}), {0: []},
- msg='if branch in dict key not successful, return `[]`')
+ msg='use `default` if key fails and `default`')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}), {},
+ msg='remove empty values when dict key')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}, default=...), {0: ...},
+ msg='use `default` when dict key and `default`')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 'fail'}}), {},
+ msg='remove empty values when nested dict key fails')
+ self.assertEqual(traverse_obj(None, {0: 'fail'}), {},
+ msg='default to dict if pruned')
+ self.assertEqual(traverse_obj(None, {0: 'fail'}, default=...), {0: ...},
+ msg='default to dict if pruned and default is given')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 'fail'}}, default=...), {0: {0: ...}},
+ msg='use nested `default` when nested dict key fails and `default`')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: ('dict', ...)}), {},
+ msg='remove key if branch in dict key not successful')
# Testing default parameter behavior
_DEFAULT_DATA = {'None': None, 'int': 0, 'list': []}
@@ -2011,20 +2198,55 @@ Line 1
msg='if branched but not successful return `[]`, not `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, ('list', ...)), [],
msg='if branched but object is empty return `[]`, not `default`')
+ self.assertEqual(traverse_obj(None, ...), [],
+ msg='if branched but object is `None` return `[]`, not `default`')
+ self.assertEqual(traverse_obj({0: None}, (0, ...)), [],
+ msg='if branched but state is `None` return `[]`, not `default`')
+
+ branching_paths = [
+ ('fail', ...),
+ (..., 'fail'),
+ 100 * ('fail',) + (...,),
+ (...,) + 100 * ('fail',),
+ ]
+ for branching_path in branching_paths:
+ self.assertEqual(traverse_obj({}, branching_path), [],
+ msg='if branched but state is `None`, return `[]` (not `default`)')
+ self.assertEqual(traverse_obj({}, 'fail', branching_path), [],
+ msg='if branching in last alternative and previous did not match, return `[]` (not `default`)')
+ self.assertEqual(traverse_obj({0: 'x'}, 0, branching_path), 'x',
+ msg='if branching in last alternative and previous did match, return single value')
+ self.assertEqual(traverse_obj({0: 'x'}, branching_path, 0), 'x',
+ msg='if branching in first alternative and non-branching path does match, return single value')
+ self.assertEqual(traverse_obj({}, branching_path, 'fail'), None,
+ msg='if branching in first alternative and non-branching path does not match, return `default`')
# Testing expected_type behavior
_EXPECTED_TYPE_DATA = {'str': 'str', 'int': 0}
- self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=str), 'str',
- msg='accept matching `expected_type` type')
- self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=int), None,
- msg='reject non matching `expected_type` type')
- self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'int', expected_type=lambda x: str(x)), '0',
- msg='transform type using type function')
- self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str',
- expected_type=lambda _: 1 / 0), None,
- msg='wrap expected_type fuction in try_call')
- self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, ..., expected_type=str), ['str'],
- msg='eliminate items that expected_type fails on')
+ self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=str),
+ 'str', msg='accept matching `expected_type` type')
+ self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=int),
+ None, msg='reject non matching `expected_type` type')
+ self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'int', expected_type=lambda x: str(x)),
+ '0', msg='transform type using type function')
+ self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=lambda _: 1 / 0),
+ None, msg='wrap expected_type fuction in try_call')
+ self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, ..., expected_type=str),
+ ['str'], msg='eliminate items that expected_type fails on')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2}, expected_type=int),
+ {0: 100}, msg='type as expected_type should filter dict values')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2, 2: 'None'}, expected_type=str_or_none),
+ {0: '100', 1: '1.2'}, msg='function as expected_type should transform dict values')
+ self.assertEqual(traverse_obj(_TEST_DATA, ({0: 1.2}, 0, {int_or_none}), expected_type=int),
+ 1, msg='expected_type should not filter non final dict values')
+ self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 100, 1: 'str'}}, expected_type=int),
+ {0: {0: 100}}, msg='expected_type should transform deep dict values')
+ self.assertEqual(traverse_obj(_TEST_DATA, [({0: '...'}, {0: '...'})], expected_type=type(...)),
+ [{0: ...}, {0: ...}], msg='expected_type should transform branched dict values')
+ self.assertEqual(traverse_obj({1: {3: 4}}, [(1, 2), 3], expected_type=int),
+ [4], msg='expected_type regression for type matching in tuple branching')
+ self.assertEqual(traverse_obj(_TEST_DATA, ['data', ...], expected_type=int),
+ [], msg='expected_type regression for type matching in dict result')
# Test get_all behavior
_GET_ALL_DATA = {'key': [0, 1, 2]}
@@ -2064,14 +2286,23 @@ Line 1
traverse_string=True), '.',
msg='traverse into converted data if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', ...),
- traverse_string=True), list('str'),
- msg='`...` branching into string should result in list')
+ traverse_string=True), 'str',
+ msg='`...` should result in string (same value) if `traverse_string`')
+ self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', slice(0, None, 2)),
+ traverse_string=True), 'sr',
+ msg='`slice` should result in string if `traverse_string`')
+ self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', lambda i, v: i or v == "s"),
+ traverse_string=True), 'str',
+ msg='function should result in string if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', (0, 2)),
traverse_string=True), ['s', 'r'],
- msg='branching into string should result in list')
- self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', lambda _, x: x),
- traverse_string=True), list('str'),
- msg='function branching into string should result in list')
+ msg='branching should result in list if `traverse_string`')
+ self.assertEqual(traverse_obj({}, (0, ...), traverse_string=True), [],
+ msg='branching should result in list if `traverse_string`')
+ self.assertEqual(traverse_obj({}, (0, lambda x, y: True), traverse_string=True), [],
+ msg='branching should result in list if `traverse_string`')
+ self.assertEqual(traverse_obj({}, (0, slice(1)), traverse_string=True), [],
+ msg='branching should result in list if `traverse_string`')
# Test is_user_input behavior
_IS_USER_INPUT_DATA = {'range8': list(range(8))}
@@ -2108,6 +2339,48 @@ Line 1
msg='failing str key on a `re.Match` should return `default`')
self.assertEqual(traverse_obj(mobj, 8), None,
msg='failing int key on a `re.Match` should return `default`')
+ self.assertEqual(traverse_obj(mobj, lambda k, _: k in (0, 'group')), ['0123', '3'],
+ msg='function on a `re.Match` should give group name as well')
+
+ def test_http_header_dict(self):
+ headers = HTTPHeaderDict()
+ headers['ytdl-test'] = b'0'
+ self.assertEqual(list(headers.items()), [('Ytdl-Test', '0')])
+ headers['ytdl-test'] = 1
+ self.assertEqual(list(headers.items()), [('Ytdl-Test', '1')])
+ headers['Ytdl-test'] = '2'
+ self.assertEqual(list(headers.items()), [('Ytdl-Test', '2')])
+ self.assertTrue('ytDl-Test' in headers)
+ self.assertEqual(str(headers), str(dict(headers)))
+ self.assertEqual(repr(headers), str(dict(headers)))
+
+ headers.update({'X-dlp': 'data'})
+ self.assertEqual(set(headers.items()), {('Ytdl-Test', '2'), ('X-Dlp', 'data')})
+ self.assertEqual(dict(headers), {'Ytdl-Test': '2', 'X-Dlp': 'data'})
+ self.assertEqual(len(headers), 2)
+ self.assertEqual(headers.copy(), headers)
+ headers2 = HTTPHeaderDict({'X-dlp': 'data3'}, **headers, **{'X-dlp': 'data2'})
+ self.assertEqual(set(headers2.items()), {('Ytdl-Test', '2'), ('X-Dlp', 'data2')})
+ self.assertEqual(len(headers2), 2)
+ headers2.clear()
+ self.assertEqual(len(headers2), 0)
+
+ # ensure we prefer latter headers
+ headers3 = HTTPHeaderDict({'Ytdl-TeSt': 1}, {'Ytdl-test': 2})
+ self.assertEqual(set(headers3.items()), {('Ytdl-Test', '2')})
+ del headers3['ytdl-tesT']
+ self.assertEqual(dict(headers3), {})
+
+ headers4 = HTTPHeaderDict({'ytdl-test': 'data;'})
+ self.assertEqual(set(headers4.items()), {('Ytdl-Test', 'data;')})
+
+ def test_extract_basic_auth(self):
+ assert extract_basic_auth('http://:foo.bar') == ('http://:foo.bar', None)
+ assert extract_basic_auth('http://foo.bar') == ('http://foo.bar', None)
+ assert extract_basic_auth('http://@foo.bar') == ('http://foo.bar', 'Basic Og==')
+ assert extract_basic_auth('http://:pass@foo.bar') == ('http://foo.bar', 'Basic OnBhc3M=')
+ assert extract_basic_auth('http://user:@foo.bar') == ('http://foo.bar', 'Basic dXNlcjo=')
+ assert extract_basic_auth('http://user:pass@foo.bar') == ('http://foo.bar', 'Basic dXNlcjpwYXNz')
if __name__ == '__main__':
diff --git a/test/test_youtube_signature.py b/test/test_youtube_signature.py
index 6d753fbf0..c5592845b 100644
--- a/test/test_youtube_signature.py
+++ b/test/test_youtube_signature.py
@@ -62,11 +62,20 @@ _SIG_TESTS = [
'https://s.ytimg.com/yts/jsbin/html5player-en_US-vflKjOTVq/html5player.js',
'312AA52209E3623129A412D56A40F11CB0AF14AE.3EE09501CB14E3BCDC3B2AE808BF3F1D14E7FBF12',
'112AA5220913623229A412D56A40F11CB0AF14AE.3EE0950FCB14EEBCDC3B2AE808BF331D14E7FBF3',
- )
+ ),
+ (
+ 'https://www.youtube.com/s/player/6ed0d907/player_ias.vflset/en_US/base.js',
+ '2aq0aqSyOoJXtK73m-uME_jv7-pT15gOFC02RFkGMqWpzEICs69VdbwQ0LDp1v7j8xx92efCJlYFYb1sUkkBSPOlPmXgIARw8JQ0qOAOAA',
+ 'AOq0QJ8wRAIgXmPlOPSBkkUs1bYFYlJCfe29xx8j7v1pDL2QwbdV96sCIEzpWqMGkFR20CFOg51Tp-7vj_EMu-m37KtXJoOySqa0',
+ ),
]
_NSIG_TESTS = [
(
+ 'https://www.youtube.com/s/player/7862ca1f/player_ias.vflset/en_US/base.js',
+ 'X_LCxVDjAavgE5t', 'yxJ1dM6iz5ogUg',
+ ),
+ (
'https://www.youtube.com/s/player/9216d1f7/player_ias.vflset/en_US/base.js',
'SLp9F5bwjAdhE9F-', 'gWnb9IK2DJ8Q1w',
),
@@ -134,6 +143,26 @@ _NSIG_TESTS = [
'https://www.youtube.com/s/player/7a062b77/player_ias.vflset/en_US/base.js',
'NRcE3y3mVtm_cV-W', 'VbsCYUATvqlt5w',
),
+ (
+ 'https://www.youtube.com/s/player/dac945fd/player_ias.vflset/en_US/base.js',
+ 'o8BkRxXhuYsBCWi6RplPdP', '3Lx32v_hmzTm6A',
+ ),
+ (
+ 'https://www.youtube.com/s/player/6f20102c/player_ias.vflset/en_US/base.js',
+ 'lE8DhoDmKqnmJJ', 'pJTTX6XyJP2BYw',
+ ),
+ (
+ 'https://www.youtube.com/s/player/cfa9e7cb/player_ias.vflset/en_US/base.js',
+ 'aCi3iElgd2kq0bxVbQ', 'QX1y8jGb2IbZ0w',
+ ),
+ (
+ 'https://www.youtube.com/s/player/8c7583ff/player_ias.vflset/en_US/base.js',
+ '1wWCVpRR96eAmMI87L', 'KSkWAVv1ZQxC3A',
+ ),
+ (
+ 'https://www.youtube.com/s/player/b7910ca8/player_ias.vflset/en_US/base.js',
+ '_hXMCwMt9qE310D', 'LoZMgkkofRMCZQ',
+ ),
]
@@ -210,7 +239,7 @@ def n_sig(jscode, sig_input):
make_sig_test = t_factory(
- 'signature', signature, re.compile(r'.*-(?P<id>[a-zA-Z0-9_-]+)(?:/watch_as3|/html5player)?\.[a-z]+$'))
+ 'signature', signature, re.compile(r'.*(?:-|/player/)(?P<id>[a-zA-Z0-9_-]+)(?:/.+\.js|(?:/watch_as3|/html5player)?\.[a-z]+)$'))
for test_spec in _SIG_TESTS:
make_sig_test(*test_spec)
diff --git a/test/testdata/yt_dlp_plugins/extractor/_ignore.py b/test/testdata/yt_dlp_plugins/extractor/_ignore.py
new file mode 100644
index 000000000..57faf75bb
--- /dev/null
+++ b/test/testdata/yt_dlp_plugins/extractor/_ignore.py
@@ -0,0 +1,5 @@
+from yt_dlp.extractor.common import InfoExtractor
+
+
+class IgnorePluginIE(InfoExtractor):
+ pass
diff --git a/test/testdata/yt_dlp_plugins/extractor/ignore.py b/test/testdata/yt_dlp_plugins/extractor/ignore.py
new file mode 100644
index 000000000..816a16aa2
--- /dev/null
+++ b/test/testdata/yt_dlp_plugins/extractor/ignore.py
@@ -0,0 +1,12 @@
+from yt_dlp.extractor.common import InfoExtractor
+
+
+class IgnoreNotInAllPluginIE(InfoExtractor):
+ pass
+
+
+class InAllPluginIE(InfoExtractor):
+ pass
+
+
+__all__ = ['InAllPluginIE']
diff --git a/test/testdata/yt_dlp_plugins/extractor/normal.py b/test/testdata/yt_dlp_plugins/extractor/normal.py
new file mode 100644
index 000000000..b09009bdc
--- /dev/null
+++ b/test/testdata/yt_dlp_plugins/extractor/normal.py
@@ -0,0 +1,9 @@
+from yt_dlp.extractor.common import InfoExtractor
+
+
+class NormalPluginIE(InfoExtractor):
+ pass
+
+
+class _IgnoreUnderscorePluginIE(InfoExtractor):
+ pass
diff --git a/test/testdata/yt_dlp_plugins/postprocessor/normal.py b/test/testdata/yt_dlp_plugins/postprocessor/normal.py
new file mode 100644
index 000000000..315b85a48
--- /dev/null
+++ b/test/testdata/yt_dlp_plugins/postprocessor/normal.py
@@ -0,0 +1,5 @@
+from yt_dlp.postprocessor.common import PostProcessor
+
+
+class NormalPluginPP(PostProcessor):
+ pass
diff --git a/test/testdata/zipped_plugins/yt_dlp_plugins/extractor/zipped.py b/test/testdata/zipped_plugins/yt_dlp_plugins/extractor/zipped.py
new file mode 100644
index 000000000..01542e0d8
--- /dev/null
+++ b/test/testdata/zipped_plugins/yt_dlp_plugins/extractor/zipped.py
@@ -0,0 +1,5 @@
+from yt_dlp.extractor.common import InfoExtractor
+
+
+class ZippedPluginIE(InfoExtractor):
+ pass
diff --git a/test/testdata/zipped_plugins/yt_dlp_plugins/postprocessor/zipped.py b/test/testdata/zipped_plugins/yt_dlp_plugins/postprocessor/zipped.py
new file mode 100644
index 000000000..223822bd6
--- /dev/null
+++ b/test/testdata/zipped_plugins/yt_dlp_plugins/postprocessor/zipped.py
@@ -0,0 +1,5 @@
+from yt_dlp.postprocessor.common import PostProcessor
+
+
+class ZippedPluginPP(PostProcessor):
+ pass