diff options
Diffstat (limited to 'hypervideo_dl/extractor/instagram.py')
-rw-r--r-- | hypervideo_dl/extractor/instagram.py | 359 |
1 files changed, 209 insertions, 150 deletions
diff --git a/hypervideo_dl/extractor/instagram.py b/hypervideo_dl/extractor/instagram.py index 970f2c8..0233513 100644 --- a/hypervideo_dl/extractor/instagram.py +++ b/hypervideo_dl/extractor/instagram.py @@ -1,19 +1,17 @@ -# coding: utf-8 - -import itertools import hashlib +import itertools import json import re import time +import urllib.error from .common import InfoExtractor -from ..compat import ( - compat_HTTPError, -) from ..utils import ( ExtractorError, - format_field, + decode_base_n, + encode_base_n, float_or_none, + format_field, get_element_by_attribute, int_or_none, lowercase_escape, @@ -24,42 +22,59 @@ from ..utils import ( urlencode_postdata, ) +_ENCODING_CHARS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_' + + +def _pk_to_id(id): + """Source: https://stackoverflow.com/questions/24437823/getting-instagram-post-url-from-media-id""" + return encode_base_n(int(id.split('_')[0]), table=_ENCODING_CHARS) + + +def _id_to_pk(shortcode): + """Covert a shortcode to a numeric value""" + return decode_base_n(shortcode[:11], table=_ENCODING_CHARS) + class InstagramBaseIE(InfoExtractor): _NETRC_MACHINE = 'instagram' _IS_LOGGED_IN = False + _API_BASE_URL = 'https://i.instagram.com/api/v1' + _LOGIN_URL = 'https://www.instagram.com/accounts/login' + _API_HEADERS = { + 'X-IG-App-ID': '936619743392459', + 'X-ASBD-ID': '198387', + 'X-IG-WWW-Claim': '0', + 'Origin': 'https://www.instagram.com', + 'Accept': '*/*', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36', + } + def _perform_login(self, username, password): if self._IS_LOGGED_IN: return login_webpage = self._download_webpage( - 'https://www.instagram.com/accounts/login/', None, - note='Downloading login webpage', errnote='Failed to download login webpage') + self._LOGIN_URL, None, note='Downloading login webpage', errnote='Failed to download login webpage') - shared_data = self._parse_json( - self._search_regex( - r'window\._sharedData\s*=\s*({.+?});', - login_webpage, 'shared data', default='{}'), - None) - - login = self._download_json('https://www.instagram.com/accounts/login/ajax/', None, note='Logging in', headers={ - 'Accept': '*/*', - 'X-IG-App-ID': '936619743392459', - 'X-ASBD-ID': '198387', - 'X-IG-WWW-Claim': '0', - 'X-Requested-With': 'XMLHttpRequest', - 'X-CSRFToken': shared_data['config']['csrf_token'], - 'X-Instagram-AJAX': shared_data['rollout_hash'], - 'Referer': 'https://www.instagram.com/', - }, data=urlencode_postdata({ - 'enc_password': f'#PWD_INSTAGRAM_BROWSER:0:{int(time.time())}:{password}', - 'username': username, - 'queryParams': '{}', - 'optIntoOneTap': 'false', - 'stopDeletionNonce': '', - 'trustedDeviceRecords': '{}', - })) + shared_data = self._parse_json(self._search_regex( + r'window\._sharedData\s*=\s*({.+?});', login_webpage, 'shared data', default='{}'), None) + + login = self._download_json( + f'{self._LOGIN_URL}/ajax/', None, note='Logging in', headers={ + **self._API_HEADERS, + 'X-Requested-With': 'XMLHttpRequest', + 'X-CSRFToken': shared_data['config']['csrf_token'], + 'X-Instagram-AJAX': shared_data['rollout_hash'], + 'Referer': 'https://www.instagram.com/', + }, data=urlencode_postdata({ + 'enc_password': f'#PWD_INSTAGRAM_BROWSER:0:{int(time.time())}:{password}', + 'username': username, + 'queryParams': '{}', + 'optIntoOneTap': 'false', + 'stopDeletionNonce': '', + 'trustedDeviceRecords': '{}', + })) if not login.get('authenticated'): if login.get('message'): @@ -124,7 +139,7 @@ class InstagramBaseIE(InfoExtractor): } def _extract_product_media(self, product_media): - media_id = product_media.get('code') or product_media.get('id') + media_id = product_media.get('code') or _pk_to_id(product_media.get('pk')) vcodec = product_media.get('video_codec') dash_manifest_raw = product_media.get('video_dash_manifest') videos_list = product_media.get('video_versions') @@ -140,7 +155,6 @@ class InstagramBaseIE(InfoExtractor): } for format in videos_list or []] if dash_manifest_raw: formats.extend(self._parse_mpd_formats(self._parse_xml(dash_manifest_raw, media_id), mpd_id='dash')) - self._sort_formats(formats) thumbnails = [{ 'url': thumbnail.get('url'), @@ -160,7 +174,7 @@ class InstagramBaseIE(InfoExtractor): user_info = product_info.get('user') or {} info_dict = { - 'id': product_info.get('code') or product_info.get('id'), + 'id': _pk_to_id(traverse_obj(product_info, 'pk', 'id', expected_type=str_or_none)[:19]), 'title': product_info.get('title') or f'Video by {user_info.get("username")}', 'description': traverse_obj(product_info, ('caption', 'text'), expected_type=str_or_none), 'timestamp': int_or_none(product_info.get('taken_at')), @@ -170,6 +184,7 @@ class InstagramBaseIE(InfoExtractor): 'view_count': int_or_none(product_info.get('view_count')), 'like_count': int_or_none(product_info.get('like_count')), 'comment_count': int_or_none(product_info.get('comment_count')), + '__post_extractor': self.extract_comments(_pk_to_id(product_info.get('pk'))), 'http_headers': { 'Referer': 'https://www.instagram.com/', } @@ -191,6 +206,23 @@ class InstagramBaseIE(InfoExtractor): **self._extract_product_media(product_info) } + def _get_comments(self, video_id): + comments_info = self._download_json( + f'{self._API_BASE_URL}/media/{_id_to_pk(video_id)}/comments/?can_support_threading=true&permalink_enabled=false', video_id, + fatal=False, errnote='Comments extraction failed', note='Downloading comments info', headers=self._API_HEADERS) or {} + + comment_data = traverse_obj(comments_info, ('edge_media_to_parent_comment', 'edges'), 'comments') + for comment_dict in comment_data or []: + yield { + 'author': traverse_obj(comment_dict, ('node', 'owner', 'username'), ('user', 'username')), + 'author_id': traverse_obj(comment_dict, ('node', 'owner', 'id'), ('user', 'pk')), + 'author_thumbnail': traverse_obj(comment_dict, ('node', 'owner', 'profile_pic_url'), ('user', 'profile_pic_url'), expected_type=url_or_none), + 'id': traverse_obj(comment_dict, ('node', 'id'), 'pk'), + 'text': traverse_obj(comment_dict, ('node', 'text'), 'text'), + 'like_count': traverse_obj(comment_dict, ('node', 'edge_liked_by', 'count'), 'comment_like_count', expected_type=int_or_none), + 'timestamp': traverse_obj(comment_dict, ('node', 'created_at'), 'created_at', expected_type=int_or_none), + } + class InstagramIOSIE(InfoExtractor): IE_DESC = 'IOS instagram:// URL' @@ -216,27 +248,14 @@ class InstagramIOSIE(InfoExtractor): 'add_ie': ['Instagram'] }] - def _get_id(self, id): - """Source: https://stackoverflow.com/questions/24437823/getting-instagram-post-url-from-media-id""" - chrs = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_' - media_id = int(id.split('_')[0]) - shortened_id = '' - while media_id > 0: - r = media_id % 64 - media_id = (media_id - r) // 64 - shortened_id = chrs[r] + shortened_id - return shortened_id - def _real_extract(self, url): - return { - '_type': 'url_transparent', - 'url': f'http://instagram.com/tv/{self._get_id(self._match_id(url))}/', - 'ie_key': 'Instagram', - } + video_id = _pk_to_id(self._match_id(url)) + return self.url_result(f'http://instagram.com/tv/{video_id}', InstagramIE, video_id) class InstagramIE(InstagramBaseIE): _VALID_URL = r'(?P<url>https?://(?:www\.)?instagram\.com(?:/[^/]+)?/(?:p|tv|reel)/(?P<id>[^/?#&]+))' + _EMBED_REGEX = [r'<iframe[^>]+src=(["\'])(?P<url>(?:https?:)?//(?:www\.)?instagram\.com/p/[^/]+/embed.*?)\1'] _TESTS = [{ 'url': 'https://instagram.com/p/aye83DjauH/?foo=bar#abc', 'md5': '0d2da106a9d2631273e192b372806516', @@ -246,7 +265,7 @@ class InstagramIE(InstagramBaseIE): 'title': 'Video by naomipq', 'description': 'md5:1f17f0ab29bd6fe2bfad705f58de3cb8', 'thumbnail': r're:^https?://.*\.jpg', - 'duration': 0, + 'duration': 8.747, 'timestamp': 1371748545, 'upload_date': '20130620', 'uploader_id': '2815873', @@ -256,27 +275,34 @@ class InstagramIE(InstagramBaseIE): 'comment_count': int, 'comments': list, }, + 'expected_warnings': [ + 'General metadata extraction failed', + 'Main webpage is locked behind the login page', + ], }, { - # missing description - 'url': 'https://www.instagram.com/p/BA-pQFBG8HZ/?taken-by=britneyspears', + # reel + 'url': 'https://www.instagram.com/reel/Chunk8-jurw/', + 'md5': 'f6d8277f74515fa3ff9f5791426e42b1', 'info_dict': { - 'id': 'BA-pQFBG8HZ', + 'id': 'Chunk8-jurw', 'ext': 'mp4', - 'title': 'Video by britneyspears', + 'title': 'Video by instagram', + 'description': 'md5:c9cde483606ed6f80fbe9283a6a2b290', 'thumbnail': r're:^https?://.*\.jpg', - 'duration': 0, - 'timestamp': 1453760977, - 'upload_date': '20160125', - 'uploader_id': '12246775', - 'uploader': 'Britney Spears', - 'channel': 'britneyspears', + 'duration': 5.016, + 'timestamp': 1661529231, + 'upload_date': '20220826', + 'uploader_id': '25025320', + 'uploader': 'Instagram', + 'channel': 'instagram', 'like_count': int, 'comment_count': int, 'comments': list, }, - 'params': { - 'skip_download': True, - }, + 'expected_warnings': [ + 'General metadata extraction failed', + 'Main webpage is locked behind the login page', + ], }, { # multi video post 'url': 'https://www.instagram.com/p/BQ0eAlwhDrw/', @@ -285,18 +311,24 @@ class InstagramIE(InstagramBaseIE): 'id': 'BQ0dSaohpPW', 'ext': 'mp4', 'title': 'Video 1', + 'thumbnail': r're:^https?://.*\.jpg', + 'view_count': int, }, }, { 'info_dict': { 'id': 'BQ0dTpOhuHT', 'ext': 'mp4', 'title': 'Video 2', + 'thumbnail': r're:^https?://.*\.jpg', + 'view_count': int, }, }, { 'info_dict': { 'id': 'BQ0dT7RBFeF', 'ext': 'mp4', 'title': 'Video 3', + 'thumbnail': r're:^https?://.*\.jpg', + 'view_count': int, }, }], 'info_dict': { @@ -304,6 +336,10 @@ class InstagramIE(InstagramBaseIE): 'title': 'Post by instagram', 'description': 'md5:0f9203fc6a2ce4d228da5754bcf54957', }, + 'expected_warnings': [ + 'General metadata extraction failed', + 'Main webpage is locked behind the login page', + ], }, { # IGTV 'url': 'https://www.instagram.com/tv/BkfuX9UB-eK/', @@ -322,7 +358,11 @@ class InstagramIE(InstagramBaseIE): 'comment_count': int, 'comments': list, 'description': 'Meet Cass Hirst (@cass.fb), a fingerboarding pro who can perform tiny ollies and kickflips while blindfolded.', - } + }, + 'expected_warnings': [ + 'General metadata extraction failed', + 'Main webpage is locked behind the login page', + ], }, { 'url': 'https://instagram.com/p/-Cmh1cukG2/', 'only_matching': True, @@ -340,59 +380,88 @@ class InstagramIE(InstagramBaseIE): 'only_matching': True, }] - @staticmethod - def _extract_embed_url(webpage): - mobj = re.search( - r'<iframe[^>]+src=(["\'])(?P<url>(?:https?:)?//(?:www\.)?instagram\.com/p/[^/]+/embed.*?)\1', - webpage) - if mobj: - return mobj.group('url') - - blockquote_el = get_element_by_attribute( - 'class', 'instagram-media', webpage) - if blockquote_el is None: - return + @classmethod + def _extract_embed_urls(cls, url, webpage): + res = tuple(super()._extract_embed_urls(url, webpage)) + if res: + return res - mobj = re.search( - r'<a[^>]+href=([\'"])(?P<link>[^\'"]+)\1', blockquote_el) + mobj = re.search(r'<a[^>]+href=([\'"])(?P<link>[^\'"]+)\1', + get_element_by_attribute('class', 'instagram-media', webpage) or '') if mobj: - return mobj.group('link') + return [mobj.group('link')] def _real_extract(self, url): video_id, url = self._match_valid_url(url).group('id', 'url') - webpage, urlh = self._download_webpage_handle(url, video_id) - if 'www.instagram.com/accounts/login' in urlh.geturl(): - self.report_warning('Main webpage is locked behind the login page. ' - 'Retrying with embed webpage (Note that some metadata might be missing)') - webpage = self._download_webpage( - 'https://www.instagram.com/p/%s/embed/' % video_id, video_id, note='Downloading embed webpage') - - shared_data = self._parse_json( - self._search_regex( - r'window\._sharedData\s*=\s*({.+?});', - webpage, 'shared data', default='{}'), - video_id, fatal=False) - media = traverse_obj( - shared_data, - ('entry_data', 'PostPage', 0, 'graphql', 'shortcode_media'), - ('entry_data', 'PostPage', 0, 'media'), - expected_type=dict) - - # _sharedData.entry_data.PostPage is empty when authenticated (see - # https://github.com/ytdl-org/youtube-dl/pull/22880) - if not media: - additional_data = self._parse_json( - self._search_regex( - r'window\.__additionalDataLoaded\s*\(\s*[^,]+,\s*({.+?})\s*\);', - webpage, 'additional data', default='{}'), - video_id, fatal=False) - product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict) - if product_item: - return self._extract_product(product_item) - media = traverse_obj(additional_data, ('graphql', 'shortcode_media'), 'shortcode_media', expected_type=dict) or {} - - if not media and 'www.instagram.com/accounts/login' in urlh.geturl(): - self.raise_login_required('You need to log in to access this content') + media, webpage = {}, '' + + if self._get_cookies(url).get('sessionid'): + info = traverse_obj(self._download_json( + f'{self._API_BASE_URL}/media/{_id_to_pk(video_id)}/info/', video_id, + fatal=False, errnote='Video info extraction failed', + note='Downloading video info', headers=self._API_HEADERS), ('items', 0)) + if info: + media.update(info) + return self._extract_product(media) + + api_check = self._download_json( + f'{self._API_BASE_URL}/web/get_ruling_for_content/?content_type=MEDIA&target_id={_id_to_pk(video_id)}', + video_id, headers=self._API_HEADERS, fatal=False, note='Setting up session', errnote=False) or {} + csrf_token = self._get_cookies('https://www.instagram.com').get('csrftoken') + + if not csrf_token: + self.report_warning('No csrf token set by Instagram API', video_id) + else: + csrf_token = csrf_token.value if api_check.get('status') == 'ok' else None + if not csrf_token: + self.report_warning('Instagram API is not granting access', video_id) + + variables = { + 'shortcode': video_id, + 'child_comment_count': 3, + 'fetch_comment_count': 40, + 'parent_comment_count': 24, + 'has_threaded_comments': True, + } + general_info = self._download_json( + 'https://www.instagram.com/graphql/query/', video_id, fatal=False, errnote=False, + headers={ + **self._API_HEADERS, + 'X-CSRFToken': csrf_token or '', + 'X-Requested-With': 'XMLHttpRequest', + 'Referer': url, + }, query={ + 'query_hash': '9f8827793ef34641b2fb195d4d41151c', + 'variables': json.dumps(variables, separators=(',', ':')), + }) + media.update(traverse_obj(general_info, ('data', 'shortcode_media')) or {}) + + if not general_info: + self.report_warning('General metadata extraction failed (some metadata might be missing).', video_id) + webpage, urlh = self._download_webpage_handle(url, video_id) + shared_data = self._search_json( + r'window\._sharedData\s*=', webpage, 'shared data', video_id, fatal=False) or {} + + if shared_data and self._LOGIN_URL not in urlh.geturl(): + media.update(traverse_obj( + shared_data, ('entry_data', 'PostPage', 0, 'graphql', 'shortcode_media'), + ('entry_data', 'PostPage', 0, 'media'), expected_type=dict) or {}) + else: + self.report_warning('Main webpage is locked behind the login page. Retrying with embed webpage (some metadata might be missing).') + webpage = self._download_webpage( + f'{url}/embed/', video_id, note='Downloading embed webpage', fatal=False) + additional_data = self._search_json( + r'window\.__additionalDataLoaded\s*\(\s*[^,]+,', webpage, 'additional data', video_id, fatal=False) + if not additional_data and not media: + self.raise_login_required('Requested content is not available, rate-limit reached or login required') + + product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict) + if product_item: + media.update(product_item) + return self._extract_product(media) + + media.update(traverse_obj( + additional_data, ('graphql', 'shortcode_media'), 'shortcode_media', expected_type=dict) or {}) username = traverse_obj(media, ('owner', 'username')) or self._search_regex( r'"owner"\s*:\s*{\s*"username"\s*:\s*"(.+?)"', webpage, 'username', fatal=False) @@ -412,7 +481,7 @@ class InstagramIE(InstagramBaseIE): if nodes: return self.playlist_result( self._extract_nodes(nodes, True), video_id, - format_field(username, template='Post by %s'), description) + format_field(username, None, 'Post by %s'), description) video_url = self._og_search_video_url(webpage, secure=False) @@ -424,7 +493,6 @@ class InstagramIE(InstagramBaseIE): dash = traverse_obj(media, ('dash_info', 'video_dash_manifest')) if dash: formats.extend(self._parse_mpd_formats(self._parse_xml(dash, video_id), mpd_id='dash')) - self._sort_formats(formats) comment_data = traverse_obj(media, ('edge_media_to_parent_comment', 'edges')) comments = [{ @@ -521,7 +589,7 @@ class InstagramPlaylistBaseIE(InstagramBaseIE): except ExtractorError as e: # if it's an error caused by a bad query, and there are # more GIS templates to try, ignore it and keep trying - if isinstance(e.cause, compat_HTTPError) and e.cause.code == 403: + if isinstance(e.cause, urllib.error.HTTPError) and e.cause.code == 403: if gis_tmpl != gis_tmpls[-1]: continue raise @@ -631,41 +699,32 @@ class InstagramStoryIE(InstagramBaseIE): def _real_extract(self, url): username, story_id = self._match_valid_url(url).groups() - - story_info_url = f'{username}/{story_id}/?__a=1' if username == 'highlights' else f'{username}/?__a=1' - story_info = self._download_json(f'https://www.instagram.com/stories/{story_info_url}', story_id, headers={ - 'X-IG-App-ID': 936619743392459, - 'X-ASBD-ID': 198387, - 'X-IG-WWW-Claim': 0, - 'X-Requested-With': 'XMLHttpRequest', - 'Referer': url, - }) - user_id = story_info['user']['id'] - highlight_title = traverse_obj(story_info, ('highlight', 'title')) + story_info = self._download_webpage(url, story_id) + user_info = self._search_json(r'"user":', story_info, 'user info', story_id, fatal=False) + if not user_info: + self.raise_login_required('This content is unreachable') + user_id = user_info.get('id') story_info_url = user_id if username != 'highlights' else f'highlight:{story_id}' - videos = self._download_json(f'https://i.instagram.com/api/v1/feed/reels_media/?reel_ids={story_info_url}', story_id, headers={ - 'X-IG-App-ID': 936619743392459, - 'X-ASBD-ID': 198387, - 'X-IG-WWW-Claim': 0, - })['reels'] - - full_name = traverse_obj(videos, ('user', 'full_name')) - - user_info = {} - if not (username and username != 'highlights' and full_name): - user_info = self._download_json( - f'https://i.instagram.com/api/v1/users/{user_id}/info/', story_id, headers={ - 'User-Agent': 'Mozilla/5.0 (Linux; Android 11; SM-A505F Build/RP1A.200720.012; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/96.0.4664.45 Mobile Safari/537.36 Instagram 214.1.0.29.120 Android (30/11; 450dpi; 1080x2122; samsung; SM-A505F; a50; exynos9610; en_US; 333717274)', - }, note='Downloading user info') + videos = traverse_obj(self._download_json( + f'{self._API_BASE_URL}/feed/reels_media/?reel_ids={story_info_url}', + story_id, errnote=False, fatal=False, headers=self._API_HEADERS), 'reels') + if not videos: + self.raise_login_required('You need to log in to access this content') - username = traverse_obj(user_info, ('user', 'username')) or username - full_name = traverse_obj(user_info, ('user', 'full_name')) or full_name + full_name = traverse_obj(videos, (f'highlight:{story_id}', 'user', 'full_name'), (str(user_id), 'user', 'full_name')) + story_title = traverse_obj(videos, (f'highlight:{story_id}', 'title')) + if not story_title: + story_title = f'Story by {username}' highlights = traverse_obj(videos, (f'highlight:{story_id}', 'items'), (str(user_id), 'items')) - return self.playlist_result([{ - **self._extract_product(highlight), - 'title': f'Story by {username}', - 'uploader': full_name, - 'uploader_id': user_id, - } for highlight in highlights], playlist_id=story_id, playlist_title=highlight_title) + info_data = [] + for highlight in highlights: + highlight_data = self._extract_product(highlight) + if highlight_data.get('formats'): + info_data.append({ + **highlight_data, + 'uploader': full_name, + 'uploader_id': user_id, + }) + return self.playlist_result(info_data, playlist_id=story_id, playlist_title=story_title) |