aboutsummaryrefslogtreecommitdiffstats
path: root/hypervideo_dl/extractor/instagram.py
diff options
context:
space:
mode:
Diffstat (limited to 'hypervideo_dl/extractor/instagram.py')
-rw-r--r--hypervideo_dl/extractor/instagram.py359
1 files changed, 209 insertions, 150 deletions
diff --git a/hypervideo_dl/extractor/instagram.py b/hypervideo_dl/extractor/instagram.py
index 970f2c8..0233513 100644
--- a/hypervideo_dl/extractor/instagram.py
+++ b/hypervideo_dl/extractor/instagram.py
@@ -1,19 +1,17 @@
-# coding: utf-8
-
-import itertools
import hashlib
+import itertools
import json
import re
import time
+import urllib.error
from .common import InfoExtractor
-from ..compat import (
- compat_HTTPError,
-)
from ..utils import (
ExtractorError,
- format_field,
+ decode_base_n,
+ encode_base_n,
float_or_none,
+ format_field,
get_element_by_attribute,
int_or_none,
lowercase_escape,
@@ -24,42 +22,59 @@ from ..utils import (
urlencode_postdata,
)
+_ENCODING_CHARS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
+
+
+def _pk_to_id(id):
+ """Source: https://stackoverflow.com/questions/24437823/getting-instagram-post-url-from-media-id"""
+ return encode_base_n(int(id.split('_')[0]), table=_ENCODING_CHARS)
+
+
+def _id_to_pk(shortcode):
+ """Covert a shortcode to a numeric value"""
+ return decode_base_n(shortcode[:11], table=_ENCODING_CHARS)
+
class InstagramBaseIE(InfoExtractor):
_NETRC_MACHINE = 'instagram'
_IS_LOGGED_IN = False
+ _API_BASE_URL = 'https://i.instagram.com/api/v1'
+ _LOGIN_URL = 'https://www.instagram.com/accounts/login'
+ _API_HEADERS = {
+ 'X-IG-App-ID': '936619743392459',
+ 'X-ASBD-ID': '198387',
+ 'X-IG-WWW-Claim': '0',
+ 'Origin': 'https://www.instagram.com',
+ 'Accept': '*/*',
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36',
+ }
+
def _perform_login(self, username, password):
if self._IS_LOGGED_IN:
return
login_webpage = self._download_webpage(
- 'https://www.instagram.com/accounts/login/', None,
- note='Downloading login webpage', errnote='Failed to download login webpage')
+ self._LOGIN_URL, None, note='Downloading login webpage', errnote='Failed to download login webpage')
- shared_data = self._parse_json(
- self._search_regex(
- r'window\._sharedData\s*=\s*({.+?});',
- login_webpage, 'shared data', default='{}'),
- None)
-
- login = self._download_json('https://www.instagram.com/accounts/login/ajax/', None, note='Logging in', headers={
- 'Accept': '*/*',
- 'X-IG-App-ID': '936619743392459',
- 'X-ASBD-ID': '198387',
- 'X-IG-WWW-Claim': '0',
- 'X-Requested-With': 'XMLHttpRequest',
- 'X-CSRFToken': shared_data['config']['csrf_token'],
- 'X-Instagram-AJAX': shared_data['rollout_hash'],
- 'Referer': 'https://www.instagram.com/',
- }, data=urlencode_postdata({
- 'enc_password': f'#PWD_INSTAGRAM_BROWSER:0:{int(time.time())}:{password}',
- 'username': username,
- 'queryParams': '{}',
- 'optIntoOneTap': 'false',
- 'stopDeletionNonce': '',
- 'trustedDeviceRecords': '{}',
- }))
+ shared_data = self._parse_json(self._search_regex(
+ r'window\._sharedData\s*=\s*({.+?});', login_webpage, 'shared data', default='{}'), None)
+
+ login = self._download_json(
+ f'{self._LOGIN_URL}/ajax/', None, note='Logging in', headers={
+ **self._API_HEADERS,
+ 'X-Requested-With': 'XMLHttpRequest',
+ 'X-CSRFToken': shared_data['config']['csrf_token'],
+ 'X-Instagram-AJAX': shared_data['rollout_hash'],
+ 'Referer': 'https://www.instagram.com/',
+ }, data=urlencode_postdata({
+ 'enc_password': f'#PWD_INSTAGRAM_BROWSER:0:{int(time.time())}:{password}',
+ 'username': username,
+ 'queryParams': '{}',
+ 'optIntoOneTap': 'false',
+ 'stopDeletionNonce': '',
+ 'trustedDeviceRecords': '{}',
+ }))
if not login.get('authenticated'):
if login.get('message'):
@@ -124,7 +139,7 @@ class InstagramBaseIE(InfoExtractor):
}
def _extract_product_media(self, product_media):
- media_id = product_media.get('code') or product_media.get('id')
+ media_id = product_media.get('code') or _pk_to_id(product_media.get('pk'))
vcodec = product_media.get('video_codec')
dash_manifest_raw = product_media.get('video_dash_manifest')
videos_list = product_media.get('video_versions')
@@ -140,7 +155,6 @@ class InstagramBaseIE(InfoExtractor):
} for format in videos_list or []]
if dash_manifest_raw:
formats.extend(self._parse_mpd_formats(self._parse_xml(dash_manifest_raw, media_id), mpd_id='dash'))
- self._sort_formats(formats)
thumbnails = [{
'url': thumbnail.get('url'),
@@ -160,7 +174,7 @@ class InstagramBaseIE(InfoExtractor):
user_info = product_info.get('user') or {}
info_dict = {
- 'id': product_info.get('code') or product_info.get('id'),
+ 'id': _pk_to_id(traverse_obj(product_info, 'pk', 'id', expected_type=str_or_none)[:19]),
'title': product_info.get('title') or f'Video by {user_info.get("username")}',
'description': traverse_obj(product_info, ('caption', 'text'), expected_type=str_or_none),
'timestamp': int_or_none(product_info.get('taken_at')),
@@ -170,6 +184,7 @@ class InstagramBaseIE(InfoExtractor):
'view_count': int_or_none(product_info.get('view_count')),
'like_count': int_or_none(product_info.get('like_count')),
'comment_count': int_or_none(product_info.get('comment_count')),
+ '__post_extractor': self.extract_comments(_pk_to_id(product_info.get('pk'))),
'http_headers': {
'Referer': 'https://www.instagram.com/',
}
@@ -191,6 +206,23 @@ class InstagramBaseIE(InfoExtractor):
**self._extract_product_media(product_info)
}
+ def _get_comments(self, video_id):
+ comments_info = self._download_json(
+ f'{self._API_BASE_URL}/media/{_id_to_pk(video_id)}/comments/?can_support_threading=true&permalink_enabled=false', video_id,
+ fatal=False, errnote='Comments extraction failed', note='Downloading comments info', headers=self._API_HEADERS) or {}
+
+ comment_data = traverse_obj(comments_info, ('edge_media_to_parent_comment', 'edges'), 'comments')
+ for comment_dict in comment_data or []:
+ yield {
+ 'author': traverse_obj(comment_dict, ('node', 'owner', 'username'), ('user', 'username')),
+ 'author_id': traverse_obj(comment_dict, ('node', 'owner', 'id'), ('user', 'pk')),
+ 'author_thumbnail': traverse_obj(comment_dict, ('node', 'owner', 'profile_pic_url'), ('user', 'profile_pic_url'), expected_type=url_or_none),
+ 'id': traverse_obj(comment_dict, ('node', 'id'), 'pk'),
+ 'text': traverse_obj(comment_dict, ('node', 'text'), 'text'),
+ 'like_count': traverse_obj(comment_dict, ('node', 'edge_liked_by', 'count'), 'comment_like_count', expected_type=int_or_none),
+ 'timestamp': traverse_obj(comment_dict, ('node', 'created_at'), 'created_at', expected_type=int_or_none),
+ }
+
class InstagramIOSIE(InfoExtractor):
IE_DESC = 'IOS instagram:// URL'
@@ -216,27 +248,14 @@ class InstagramIOSIE(InfoExtractor):
'add_ie': ['Instagram']
}]
- def _get_id(self, id):
- """Source: https://stackoverflow.com/questions/24437823/getting-instagram-post-url-from-media-id"""
- chrs = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
- media_id = int(id.split('_')[0])
- shortened_id = ''
- while media_id > 0:
- r = media_id % 64
- media_id = (media_id - r) // 64
- shortened_id = chrs[r] + shortened_id
- return shortened_id
-
def _real_extract(self, url):
- return {
- '_type': 'url_transparent',
- 'url': f'http://instagram.com/tv/{self._get_id(self._match_id(url))}/',
- 'ie_key': 'Instagram',
- }
+ video_id = _pk_to_id(self._match_id(url))
+ return self.url_result(f'http://instagram.com/tv/{video_id}', InstagramIE, video_id)
class InstagramIE(InstagramBaseIE):
_VALID_URL = r'(?P<url>https?://(?:www\.)?instagram\.com(?:/[^/]+)?/(?:p|tv|reel)/(?P<id>[^/?#&]+))'
+ _EMBED_REGEX = [r'<iframe[^>]+src=(["\'])(?P<url>(?:https?:)?//(?:www\.)?instagram\.com/p/[^/]+/embed.*?)\1']
_TESTS = [{
'url': 'https://instagram.com/p/aye83DjauH/?foo=bar#abc',
'md5': '0d2da106a9d2631273e192b372806516',
@@ -246,7 +265,7 @@ class InstagramIE(InstagramBaseIE):
'title': 'Video by naomipq',
'description': 'md5:1f17f0ab29bd6fe2bfad705f58de3cb8',
'thumbnail': r're:^https?://.*\.jpg',
- 'duration': 0,
+ 'duration': 8.747,
'timestamp': 1371748545,
'upload_date': '20130620',
'uploader_id': '2815873',
@@ -256,27 +275,34 @@ class InstagramIE(InstagramBaseIE):
'comment_count': int,
'comments': list,
},
+ 'expected_warnings': [
+ 'General metadata extraction failed',
+ 'Main webpage is locked behind the login page',
+ ],
}, {
- # missing description
- 'url': 'https://www.instagram.com/p/BA-pQFBG8HZ/?taken-by=britneyspears',
+ # reel
+ 'url': 'https://www.instagram.com/reel/Chunk8-jurw/',
+ 'md5': 'f6d8277f74515fa3ff9f5791426e42b1',
'info_dict': {
- 'id': 'BA-pQFBG8HZ',
+ 'id': 'Chunk8-jurw',
'ext': 'mp4',
- 'title': 'Video by britneyspears',
+ 'title': 'Video by instagram',
+ 'description': 'md5:c9cde483606ed6f80fbe9283a6a2b290',
'thumbnail': r're:^https?://.*\.jpg',
- 'duration': 0,
- 'timestamp': 1453760977,
- 'upload_date': '20160125',
- 'uploader_id': '12246775',
- 'uploader': 'Britney Spears',
- 'channel': 'britneyspears',
+ 'duration': 5.016,
+ 'timestamp': 1661529231,
+ 'upload_date': '20220826',
+ 'uploader_id': '25025320',
+ 'uploader': 'Instagram',
+ 'channel': 'instagram',
'like_count': int,
'comment_count': int,
'comments': list,
},
- 'params': {
- 'skip_download': True,
- },
+ 'expected_warnings': [
+ 'General metadata extraction failed',
+ 'Main webpage is locked behind the login page',
+ ],
}, {
# multi video post
'url': 'https://www.instagram.com/p/BQ0eAlwhDrw/',
@@ -285,18 +311,24 @@ class InstagramIE(InstagramBaseIE):
'id': 'BQ0dSaohpPW',
'ext': 'mp4',
'title': 'Video 1',
+ 'thumbnail': r're:^https?://.*\.jpg',
+ 'view_count': int,
},
}, {
'info_dict': {
'id': 'BQ0dTpOhuHT',
'ext': 'mp4',
'title': 'Video 2',
+ 'thumbnail': r're:^https?://.*\.jpg',
+ 'view_count': int,
},
}, {
'info_dict': {
'id': 'BQ0dT7RBFeF',
'ext': 'mp4',
'title': 'Video 3',
+ 'thumbnail': r're:^https?://.*\.jpg',
+ 'view_count': int,
},
}],
'info_dict': {
@@ -304,6 +336,10 @@ class InstagramIE(InstagramBaseIE):
'title': 'Post by instagram',
'description': 'md5:0f9203fc6a2ce4d228da5754bcf54957',
},
+ 'expected_warnings': [
+ 'General metadata extraction failed',
+ 'Main webpage is locked behind the login page',
+ ],
}, {
# IGTV
'url': 'https://www.instagram.com/tv/BkfuX9UB-eK/',
@@ -322,7 +358,11 @@ class InstagramIE(InstagramBaseIE):
'comment_count': int,
'comments': list,
'description': 'Meet Cass Hirst (@cass.fb), a fingerboarding pro who can perform tiny ollies and kickflips while blindfolded.',
- }
+ },
+ 'expected_warnings': [
+ 'General metadata extraction failed',
+ 'Main webpage is locked behind the login page',
+ ],
}, {
'url': 'https://instagram.com/p/-Cmh1cukG2/',
'only_matching': True,
@@ -340,59 +380,88 @@ class InstagramIE(InstagramBaseIE):
'only_matching': True,
}]
- @staticmethod
- def _extract_embed_url(webpage):
- mobj = re.search(
- r'<iframe[^>]+src=(["\'])(?P<url>(?:https?:)?//(?:www\.)?instagram\.com/p/[^/]+/embed.*?)\1',
- webpage)
- if mobj:
- return mobj.group('url')
-
- blockquote_el = get_element_by_attribute(
- 'class', 'instagram-media', webpage)
- if blockquote_el is None:
- return
+ @classmethod
+ def _extract_embed_urls(cls, url, webpage):
+ res = tuple(super()._extract_embed_urls(url, webpage))
+ if res:
+ return res
- mobj = re.search(
- r'<a[^>]+href=([\'"])(?P<link>[^\'"]+)\1', blockquote_el)
+ mobj = re.search(r'<a[^>]+href=([\'"])(?P<link>[^\'"]+)\1',
+ get_element_by_attribute('class', 'instagram-media', webpage) or '')
if mobj:
- return mobj.group('link')
+ return [mobj.group('link')]
def _real_extract(self, url):
video_id, url = self._match_valid_url(url).group('id', 'url')
- webpage, urlh = self._download_webpage_handle(url, video_id)
- if 'www.instagram.com/accounts/login' in urlh.geturl():
- self.report_warning('Main webpage is locked behind the login page. '
- 'Retrying with embed webpage (Note that some metadata might be missing)')
- webpage = self._download_webpage(
- 'https://www.instagram.com/p/%s/embed/' % video_id, video_id, note='Downloading embed webpage')
-
- shared_data = self._parse_json(
- self._search_regex(
- r'window\._sharedData\s*=\s*({.+?});',
- webpage, 'shared data', default='{}'),
- video_id, fatal=False)
- media = traverse_obj(
- shared_data,
- ('entry_data', 'PostPage', 0, 'graphql', 'shortcode_media'),
- ('entry_data', 'PostPage', 0, 'media'),
- expected_type=dict)
-
- # _sharedData.entry_data.PostPage is empty when authenticated (see
- # https://github.com/ytdl-org/youtube-dl/pull/22880)
- if not media:
- additional_data = self._parse_json(
- self._search_regex(
- r'window\.__additionalDataLoaded\s*\(\s*[^,]+,\s*({.+?})\s*\);',
- webpage, 'additional data', default='{}'),
- video_id, fatal=False)
- product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict)
- if product_item:
- return self._extract_product(product_item)
- media = traverse_obj(additional_data, ('graphql', 'shortcode_media'), 'shortcode_media', expected_type=dict) or {}
-
- if not media and 'www.instagram.com/accounts/login' in urlh.geturl():
- self.raise_login_required('You need to log in to access this content')
+ media, webpage = {}, ''
+
+ if self._get_cookies(url).get('sessionid'):
+ info = traverse_obj(self._download_json(
+ f'{self._API_BASE_URL}/media/{_id_to_pk(video_id)}/info/', video_id,
+ fatal=False, errnote='Video info extraction failed',
+ note='Downloading video info', headers=self._API_HEADERS), ('items', 0))
+ if info:
+ media.update(info)
+ return self._extract_product(media)
+
+ api_check = self._download_json(
+ f'{self._API_BASE_URL}/web/get_ruling_for_content/?content_type=MEDIA&target_id={_id_to_pk(video_id)}',
+ video_id, headers=self._API_HEADERS, fatal=False, note='Setting up session', errnote=False) or {}
+ csrf_token = self._get_cookies('https://www.instagram.com').get('csrftoken')
+
+ if not csrf_token:
+ self.report_warning('No csrf token set by Instagram API', video_id)
+ else:
+ csrf_token = csrf_token.value if api_check.get('status') == 'ok' else None
+ if not csrf_token:
+ self.report_warning('Instagram API is not granting access', video_id)
+
+ variables = {
+ 'shortcode': video_id,
+ 'child_comment_count': 3,
+ 'fetch_comment_count': 40,
+ 'parent_comment_count': 24,
+ 'has_threaded_comments': True,
+ }
+ general_info = self._download_json(
+ 'https://www.instagram.com/graphql/query/', video_id, fatal=False, errnote=False,
+ headers={
+ **self._API_HEADERS,
+ 'X-CSRFToken': csrf_token or '',
+ 'X-Requested-With': 'XMLHttpRequest',
+ 'Referer': url,
+ }, query={
+ 'query_hash': '9f8827793ef34641b2fb195d4d41151c',
+ 'variables': json.dumps(variables, separators=(',', ':')),
+ })
+ media.update(traverse_obj(general_info, ('data', 'shortcode_media')) or {})
+
+ if not general_info:
+ self.report_warning('General metadata extraction failed (some metadata might be missing).', video_id)
+ webpage, urlh = self._download_webpage_handle(url, video_id)
+ shared_data = self._search_json(
+ r'window\._sharedData\s*=', webpage, 'shared data', video_id, fatal=False) or {}
+
+ if shared_data and self._LOGIN_URL not in urlh.geturl():
+ media.update(traverse_obj(
+ shared_data, ('entry_data', 'PostPage', 0, 'graphql', 'shortcode_media'),
+ ('entry_data', 'PostPage', 0, 'media'), expected_type=dict) or {})
+ else:
+ self.report_warning('Main webpage is locked behind the login page. Retrying with embed webpage (some metadata might be missing).')
+ webpage = self._download_webpage(
+ f'{url}/embed/', video_id, note='Downloading embed webpage', fatal=False)
+ additional_data = self._search_json(
+ r'window\.__additionalDataLoaded\s*\(\s*[^,]+,', webpage, 'additional data', video_id, fatal=False)
+ if not additional_data and not media:
+ self.raise_login_required('Requested content is not available, rate-limit reached or login required')
+
+ product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict)
+ if product_item:
+ media.update(product_item)
+ return self._extract_product(media)
+
+ media.update(traverse_obj(
+ additional_data, ('graphql', 'shortcode_media'), 'shortcode_media', expected_type=dict) or {})
username = traverse_obj(media, ('owner', 'username')) or self._search_regex(
r'"owner"\s*:\s*{\s*"username"\s*:\s*"(.+?)"', webpage, 'username', fatal=False)
@@ -412,7 +481,7 @@ class InstagramIE(InstagramBaseIE):
if nodes:
return self.playlist_result(
self._extract_nodes(nodes, True), video_id,
- format_field(username, template='Post by %s'), description)
+ format_field(username, None, 'Post by %s'), description)
video_url = self._og_search_video_url(webpage, secure=False)
@@ -424,7 +493,6 @@ class InstagramIE(InstagramBaseIE):
dash = traverse_obj(media, ('dash_info', 'video_dash_manifest'))
if dash:
formats.extend(self._parse_mpd_formats(self._parse_xml(dash, video_id), mpd_id='dash'))
- self._sort_formats(formats)
comment_data = traverse_obj(media, ('edge_media_to_parent_comment', 'edges'))
comments = [{
@@ -521,7 +589,7 @@ class InstagramPlaylistBaseIE(InstagramBaseIE):
except ExtractorError as e:
# if it's an error caused by a bad query, and there are
# more GIS templates to try, ignore it and keep trying
- if isinstance(e.cause, compat_HTTPError) and e.cause.code == 403:
+ if isinstance(e.cause, urllib.error.HTTPError) and e.cause.code == 403:
if gis_tmpl != gis_tmpls[-1]:
continue
raise
@@ -631,41 +699,32 @@ class InstagramStoryIE(InstagramBaseIE):
def _real_extract(self, url):
username, story_id = self._match_valid_url(url).groups()
-
- story_info_url = f'{username}/{story_id}/?__a=1' if username == 'highlights' else f'{username}/?__a=1'
- story_info = self._download_json(f'https://www.instagram.com/stories/{story_info_url}', story_id, headers={
- 'X-IG-App-ID': 936619743392459,
- 'X-ASBD-ID': 198387,
- 'X-IG-WWW-Claim': 0,
- 'X-Requested-With': 'XMLHttpRequest',
- 'Referer': url,
- })
- user_id = story_info['user']['id']
- highlight_title = traverse_obj(story_info, ('highlight', 'title'))
+ story_info = self._download_webpage(url, story_id)
+ user_info = self._search_json(r'"user":', story_info, 'user info', story_id, fatal=False)
+ if not user_info:
+ self.raise_login_required('This content is unreachable')
+ user_id = user_info.get('id')
story_info_url = user_id if username != 'highlights' else f'highlight:{story_id}'
- videos = self._download_json(f'https://i.instagram.com/api/v1/feed/reels_media/?reel_ids={story_info_url}', story_id, headers={
- 'X-IG-App-ID': 936619743392459,
- 'X-ASBD-ID': 198387,
- 'X-IG-WWW-Claim': 0,
- })['reels']
-
- full_name = traverse_obj(videos, ('user', 'full_name'))
-
- user_info = {}
- if not (username and username != 'highlights' and full_name):
- user_info = self._download_json(
- f'https://i.instagram.com/api/v1/users/{user_id}/info/', story_id, headers={
- 'User-Agent': 'Mozilla/5.0 (Linux; Android 11; SM-A505F Build/RP1A.200720.012; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/96.0.4664.45 Mobile Safari/537.36 Instagram 214.1.0.29.120 Android (30/11; 450dpi; 1080x2122; samsung; SM-A505F; a50; exynos9610; en_US; 333717274)',
- }, note='Downloading user info')
+ videos = traverse_obj(self._download_json(
+ f'{self._API_BASE_URL}/feed/reels_media/?reel_ids={story_info_url}',
+ story_id, errnote=False, fatal=False, headers=self._API_HEADERS), 'reels')
+ if not videos:
+ self.raise_login_required('You need to log in to access this content')
- username = traverse_obj(user_info, ('user', 'username')) or username
- full_name = traverse_obj(user_info, ('user', 'full_name')) or full_name
+ full_name = traverse_obj(videos, (f'highlight:{story_id}', 'user', 'full_name'), (str(user_id), 'user', 'full_name'))
+ story_title = traverse_obj(videos, (f'highlight:{story_id}', 'title'))
+ if not story_title:
+ story_title = f'Story by {username}'
highlights = traverse_obj(videos, (f'highlight:{story_id}', 'items'), (str(user_id), 'items'))
- return self.playlist_result([{
- **self._extract_product(highlight),
- 'title': f'Story by {username}',
- 'uploader': full_name,
- 'uploader_id': user_id,
- } for highlight in highlights], playlist_id=story_id, playlist_title=highlight_title)
+ info_data = []
+ for highlight in highlights:
+ highlight_data = self._extract_product(highlight)
+ if highlight_data.get('formats'):
+ info_data.append({
+ **highlight_data,
+ 'uploader': full_name,
+ 'uploader_id': user_id,
+ })
+ return self.playlist_result(info_data, playlist_id=story_id, playlist_title=story_title)