aboutsummaryrefslogtreecommitdiffstats
path: root/hypervideo_dl/extractor/triller.py
diff options
context:
space:
mode:
Diffstat (limited to 'hypervideo_dl/extractor/triller.py')
-rw-r--r--hypervideo_dl/extractor/triller.py315
1 files changed, 175 insertions, 140 deletions
diff --git a/hypervideo_dl/extractor/triller.py b/hypervideo_dl/extractor/triller.py
index acd9e68..56e51fe 100644
--- a/hypervideo_dl/extractor/triller.py
+++ b/hypervideo_dl/extractor/triller.py
@@ -1,15 +1,21 @@
import itertools
import json
+import re
from .common import InfoExtractor
+from ..networking import HEADRequest
from ..utils import (
ExtractorError,
+ UnsupportedError,
+ determine_ext,
int_or_none,
+ parse_resolution,
str_or_none,
traverse_obj,
- unified_strdate,
unified_timestamp,
url_basename,
+ urljoin,
+ url_or_none,
)
@@ -22,25 +28,22 @@ class TrillerBaseIE(InfoExtractor):
if self._API_HEADERS.get('Authorization'):
return
- user_check = self._download_json(
+ headers = {**self._API_HEADERS, 'Content-Type': 'application/json'}
+ user_check = traverse_obj(self._download_json(
f'{self._API_BASE_URL}/api/user/is-valid-username', None, note='Checking username',
- fatal=False, expected_status=400, headers={
- 'Content-Type': 'application/json',
- 'Origin': 'https://triller.co',
- }, data=json.dumps({'username': username}, separators=(',', ':')).encode('utf-8'))
- if user_check.get('status'): # endpoint returns "status":false if username exists
+ fatal=False, expected_status=400, headers=headers,
+ data=json.dumps({'username': username}, separators=(',', ':')).encode()), 'status')
+
+ if user_check: # endpoint returns `"status":false` if username exists
raise ExtractorError('Unable to login: Invalid username', expected=True)
- credentials = {
- 'username': username,
- 'password': password,
- }
login = self._download_json(
- f'{self._API_BASE_URL}/user/auth', None, note='Logging in',
- fatal=False, expected_status=400, headers={
- 'Content-Type': 'application/json',
- 'Origin': 'https://triller.co',
- }, data=json.dumps(credentials, separators=(',', ':')).encode('utf-8'))
+ f'{self._API_BASE_URL}/user/auth', None, note='Logging in', fatal=False,
+ expected_status=400, headers=headers, data=json.dumps({
+ 'username': username,
+ 'password': password,
+ }, separators=(',', ':')).encode()) or {}
+
if not login.get('auth_token'):
if login.get('error') == 1008:
raise ExtractorError('Unable to login: Incorrect password', expected=True)
@@ -55,100 +58,93 @@ class TrillerBaseIE(InfoExtractor):
headers=self._API_HEADERS, query={'limit': limit}) or {}
if not comment_info.get('comments'):
return
- for comment_dict in comment_info['comments']:
- yield {
- 'author': traverse_obj(comment_dict, ('author', 'username')),
- 'author_id': traverse_obj(comment_dict, ('author', 'user_id')),
- 'id': comment_dict.get('id'),
- 'text': comment_dict.get('body'),
- 'timestamp': unified_timestamp(comment_dict.get('timestamp')),
- }
+ yield from traverse_obj(comment_info, ('comments', ..., {
+ 'id': ('id', {str_or_none}),
+ 'text': 'body',
+ 'author': ('author', 'username'),
+ 'author_id': ('author', 'user_id'),
+ 'timestamp': ('timestamp', {unified_timestamp}),
+ }))
- def _check_user_info(self, user_info):
- if not user_info:
- self.report_warning('Unable to extract user info')
- elif user_info.get('private') and not user_info.get('followed_by_me'):
- raise ExtractorError('This video is private', expected=True)
- elif traverse_obj(user_info, 'blocked_by_user', 'blocking_user'):
- raise ExtractorError('The author of the video is blocked', expected=True)
- return user_info
+ def _parse_video_info(self, video_info, username, user_id, display_id=None):
+ video_id = str(video_info['id'])
+ display_id = display_id or video_info.get('video_uuid')
+
+ if traverse_obj(video_info, (
+ None, ('transcoded_url', 'video_url', 'stream_url', 'audio_url'),
+ {lambda x: re.search(r'/copyright/', x)}), get_all=False):
+ self.raise_no_formats('This video has been removed due to licensing restrictions', expected=True)
- def _parse_video_info(self, video_info, username, user_info=None):
- video_uuid = video_info.get('video_uuid')
- video_id = video_info.get('id')
+ def format_info(url):
+ return {
+ 'url': url,
+ 'ext': determine_ext(url),
+ 'format_id': url_basename(url).split('.')[0],
+ }
formats = []
- video_url = traverse_obj(video_info, 'video_url', 'stream_url')
- if video_url:
- formats.append({
- 'url': video_url,
- 'ext': 'mp4',
- 'vcodec': 'h264',
- 'width': video_info.get('width'),
- 'height': video_info.get('height'),
- 'format_id': url_basename(video_url).split('.')[0],
- 'filesize': video_info.get('filesize'),
- })
- video_set = video_info.get('video_set') or []
- for video in video_set:
- resolution = video.get('resolution') or ''
+
+ if determine_ext(video_info.get('transcoded_url')) == 'm3u8':
+ formats.extend(self._extract_m3u8_formats(
+ video_info['transcoded_url'], video_id, 'mp4', m3u8_id='hls', fatal=False))
+
+ for video in traverse_obj(video_info, ('video_set', lambda _, v: url_or_none(v['url']))):
formats.append({
- 'url': video['url'],
- 'ext': 'mp4',
+ **format_info(video['url']),
+ **parse_resolution(video.get('resolution')),
'vcodec': video.get('codec'),
'vbr': int_or_none(video.get('bitrate'), 1000),
- 'width': int_or_none(resolution.split('x')[0]),
- 'height': int_or_none(resolution.split('x')[1]),
- 'format_id': url_basename(video['url']).split('.')[0],
})
- audio_url = video_info.get('audio_url')
- if audio_url:
+
+ video_url = traverse_obj(video_info, 'video_url', 'stream_url', expected_type=url_or_none)
+ if video_url:
formats.append({
- 'url': audio_url,
- 'ext': 'm4a',
- 'format_id': url_basename(audio_url).split('.')[0],
+ **format_info(video_url),
+ 'vcodec': 'h264',
+ **traverse_obj(video_info, {
+ 'width': 'width',
+ 'height': 'height',
+ 'filesize': 'filesize',
+ }, expected_type=int_or_none),
})
- manifest_url = video_info.get('transcoded_url')
- if manifest_url:
- formats.extend(self._extract_m3u8_formats(
- manifest_url, video_id, 'mp4', entry_protocol='m3u8_native',
- m3u8_id='hls', fatal=False))
-
- comment_count = int_or_none(video_info.get('comment_count'))
+ audio_url = url_or_none(video_info.get('audio_url'))
+ if audio_url:
+ formats.append(format_info(audio_url))
- user_info = user_info or traverse_obj(video_info, 'user', default={})
+ comment_count = traverse_obj(video_info, ('comment_count', {int_or_none}))
return {
- 'id': str_or_none(video_id) or video_uuid,
- 'title': video_info.get('description') or f'Video by {username}',
- 'thumbnail': video_info.get('thumbnail_url'),
- 'description': video_info.get('description'),
- 'uploader': str_or_none(username),
- 'uploader_id': str_or_none(user_info.get('user_id')),
- 'creator': str_or_none(user_info.get('name')),
- 'timestamp': unified_timestamp(video_info.get('timestamp')),
- 'upload_date': unified_strdate(video_info.get('timestamp')),
- 'duration': int_or_none(video_info.get('duration')),
- 'view_count': int_or_none(video_info.get('play_count')),
- 'like_count': int_or_none(video_info.get('likes_count')),
- 'artist': str_or_none(video_info.get('song_artist')),
- 'track': str_or_none(video_info.get('song_title')),
- 'webpage_url': f'https://triller.co/@{username}/video/{video_uuid}',
+ 'id': video_id,
+ 'display_id': display_id,
+ 'uploader': username,
+ 'uploader_id': user_id or traverse_obj(video_info, ('user', 'user_id', {str_or_none})),
+ 'webpage_url': urljoin(f'https://triller.co/@{username}/video/', display_id),
'uploader_url': f'https://triller.co/@{username}',
'extractor_key': TrillerIE.ie_key(),
'extractor': TrillerIE.IE_NAME,
'formats': formats,
'comment_count': comment_count,
'__post_extractor': self.extract_comments(video_id, comment_count),
+ **traverse_obj(video_info, {
+ 'title': ('description', {lambda x: x.replace('\r\n', ' ')}),
+ 'description': 'description',
+ 'creator': ((('user'), ('users', lambda _, v: str(v['user_id']) == user_id)), 'name'),
+ 'thumbnail': ('thumbnail_url', {url_or_none}),
+ 'timestamp': ('timestamp', {unified_timestamp}),
+ 'duration': ('duration', {int_or_none}),
+ 'view_count': ('play_count', {int_or_none}),
+ 'like_count': ('likes_count', {int_or_none}),
+ 'artist': 'song_artist',
+ 'track': 'song_title',
+ }, get_all=False),
}
class TrillerIE(TrillerBaseIE):
_VALID_URL = r'''(?x)
https?://(?:www\.)?triller\.co/
- @(?P<username>[\w\._]+)/video/
- (?P<id>[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12})
+ @(?P<username>[\w.]+)/video/(?P<id>[\da-f]{8}-(?:[\da-f]{4}-){3}[\da-f]{12})
'''
_TESTS = [{
'url': 'https://triller.co/@theestallion/video/2358fcd7-3df2-4c77-84c8-1d091610a6cf',
@@ -165,16 +161,14 @@ class TrillerIE(TrillerBaseIE):
'timestamp': 1660598222,
'upload_date': '20220815',
'duration': 47,
- 'height': 3840,
- 'width': 2160,
'view_count': int,
'like_count': int,
'artist': 'Megan Thee Stallion',
'track': 'Her',
- 'webpage_url': 'https://triller.co/@theestallion/video/2358fcd7-3df2-4c77-84c8-1d091610a6cf',
'uploader_url': 'https://triller.co/@theestallion',
'comment_count': int,
- }
+ },
+ 'skip': 'This video has been removed due to licensing restrictions',
}, {
'url': 'https://triller.co/@charlidamelio/video/46c6fcfa-aa9e-4503-a50c-68444f44cddc',
'md5': '874055f462af5b0699b9dbb527a505a0',
@@ -182,6 +176,7 @@ class TrillerIE(TrillerBaseIE):
'id': '71621339',
'ext': 'mp4',
'title': 'md5:4c91ea82760fe0fffb71b8c3aa7295fc',
+ 'display_id': '46c6fcfa-aa9e-4503-a50c-68444f44cddc',
'thumbnail': r're:^https://uploads\.cdn\.triller\.co/.+\.jpg$',
'description': 'md5:4c91ea82760fe0fffb71b8c3aa7295fc',
'uploader': 'charlidamelio',
@@ -190,59 +185,73 @@ class TrillerIE(TrillerBaseIE):
'timestamp': 1660773354,
'upload_date': '20220817',
'duration': 16,
- 'height': 1920,
- 'width': 1080,
'view_count': int,
'like_count': int,
'artist': 'Dixie',
'track': 'Someone to Blame',
- 'webpage_url': 'https://triller.co/@charlidamelio/video/46c6fcfa-aa9e-4503-a50c-68444f44cddc',
'uploader_url': 'https://triller.co/@charlidamelio',
'comment_count': int,
- }
+ },
+ }, {
+ 'url': 'https://triller.co/@theestallion/video/07f35f38-1f51-48e2-8c5f-f7a8e829988f',
+ 'md5': 'af7b3553e4b8bfca507636471ee2eb41',
+ 'info_dict': {
+ 'id': '71837829',
+ 'ext': 'mp4',
+ 'title': 'UNGRATEFUL VIDEO OUT NOW ๐Ÿ‘๐Ÿพ๐Ÿ‘๐Ÿพ๐Ÿ‘๐Ÿพ ๐Ÿ’™๐Ÿ’™ link my bio #womeninhiphop',
+ 'display_id': '07f35f38-1f51-48e2-8c5f-f7a8e829988f',
+ 'thumbnail': r're:^https://uploads\.cdn\.triller\.co/.+\.jpg$',
+ 'description': 'UNGRATEFUL VIDEO OUT NOW ๐Ÿ‘๐Ÿพ๐Ÿ‘๐Ÿพ๐Ÿ‘๐Ÿพ ๐Ÿ’™๐Ÿ’™ link my bio\r\n #womeninhiphop',
+ 'uploader': 'theestallion',
+ 'uploader_id': '18992236',
+ 'creator': 'Megan Thee Stallion',
+ 'timestamp': 1662486178,
+ 'upload_date': '20220906',
+ 'duration': 30,
+ 'view_count': int,
+ 'like_count': int,
+ 'artist': 'Unknown',
+ 'track': 'Unknown',
+ 'uploader_url': 'https://triller.co/@theestallion',
+ 'comment_count': int,
+ },
}]
def _real_extract(self, url):
- username, video_uuid = self._match_valid_url(url).group('username', 'id')
+ username, display_id = self._match_valid_url(url).group('username', 'id')
- video_info = traverse_obj(self._download_json(
- f'{self._API_BASE_URL}/api/videos/{video_uuid}',
- video_uuid, note='Downloading video info API JSON',
- errnote='Unable to download video info API JSON',
- headers=self._API_HEADERS), ('videos', 0))
- if not video_info:
- raise ExtractorError('No video info found in API response')
+ video_info = self._download_json(
+ f'{self._API_BASE_URL}/api/videos/{display_id}', display_id,
+ headers=self._API_HEADERS)['videos'][0]
- user_info = self._check_user_info(video_info.get('user') or {})
- return self._parse_video_info(video_info, username, user_info)
+ return self._parse_video_info(video_info, username, None, display_id)
class TrillerUserIE(TrillerBaseIE):
- _VALID_URL = r'https?://(?:www\.)?triller\.co/@(?P<id>[\w\._]+)/?(?:$|[#?])'
+ _VALID_URL = r'https?://(?:www\.)?triller\.co/@(?P<id>[\w.]+)/?(?:$|[#?])'
_TESTS = [{
- # first videos request only returns 2 videos
'url': 'https://triller.co/@theestallion',
- 'playlist_mincount': 9,
+ 'playlist_mincount': 12,
'info_dict': {
'id': '18992236',
'title': 'theestallion',
'thumbnail': r're:^https://uploads\.cdn\.triller\.co/.+\.jpg$',
- }
+ },
}, {
'url': 'https://triller.co/@charlidamelio',
- 'playlist_mincount': 25,
+ 'playlist_mincount': 150,
'info_dict': {
'id': '1875551',
'title': 'charlidamelio',
'thumbnail': r're:^https://uploads\.cdn\.triller\.co/.+\.jpg$',
- }
+ },
}]
def _real_initialize(self):
if not self._API_HEADERS.get('Authorization'):
guest = self._download_json(
- f'{self._API_BASE_URL}/user/create_guest',
- None, note='Creating guest session', data=b'', headers=self._API_HEADERS, query={
+ f'{self._API_BASE_URL}/user/create_guest', None,
+ note='Creating guest session', data=b'', headers=self._API_HEADERS, query={
'platform': 'Web',
'app_version': '',
})
@@ -251,44 +260,70 @@ class TrillerUserIE(TrillerBaseIE):
self._API_HEADERS['Authorization'] = f'Bearer {guest["auth_token"]}'
- def _extract_video_list(self, username, user_id, limit=6):
- query = {
- 'limit': limit,
- }
+ def _entries(self, username, user_id, limit=6):
+ query = {'limit': limit}
for page in itertools.count(1):
- for retry in self.RetryManager():
- try:
- video_list = self._download_json(
- f'{self._API_BASE_URL}/api/users/{user_id}/videos',
- username, note=f'Downloading user video list page {page}',
- errnote='Unable to download user video list', headers=self._API_HEADERS,
- query=query)
- except ExtractorError as e:
- if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0:
- retry.error = e
- continue
- raise
- if not video_list.get('videos'):
- break
- yield from video_list['videos']
- query['before_time'] = traverse_obj(video_list, ('videos', -1, 'timestamp'))
+ videos = self._download_json(
+ f'{self._API_BASE_URL}/api/users/{user_id}/videos',
+ username, note=f'Downloading user video list page {page}',
+ headers=self._API_HEADERS, query=query)
+
+ for video in traverse_obj(videos, ('videos', ...)):
+ yield self._parse_video_info(video, username, user_id)
+
+ query['before_time'] = traverse_obj(videos, ('videos', -1, 'timestamp'))
if not query['before_time']:
break
- def _entries(self, videos, username, user_info):
- for video in videos:
- yield self._parse_video_info(video, username, user_info)
-
def _real_extract(self, url):
username = self._match_id(url)
- user_info = self._check_user_info(self._download_json(
+
+ user_info = traverse_obj(self._download_json(
f'{self._API_BASE_URL}/api/users/by_username/{username}',
- username, note='Downloading user info',
- errnote='Failed to download user info', headers=self._API_HEADERS).get('user', {}))
+ username, note='Downloading user info', headers=self._API_HEADERS), ('user', {dict})) or {}
+
+ if user_info.get('private') and user_info.get('followed_by_me') not in (True, 'true'):
+ raise ExtractorError('This user profile is private', expected=True)
+ elif traverse_obj(user_info, (('blocked_by_user', 'blocking_user'), {bool}), get_all=False):
+ raise ExtractorError('The author of the video is blocked', expected=True)
user_id = str_or_none(user_info.get('user_id'))
- videos = self._extract_video_list(username, user_id)
- thumbnail = user_info.get('avatar_url')
+ if not user_id:
+ raise ExtractorError('Unable to extract user ID')
return self.playlist_result(
- self._entries(videos, username, user_info), user_id, username, thumbnail=thumbnail)
+ self._entries(username, user_id), user_id, username, thumbnail=user_info.get('avatar_url'))
+
+
+class TrillerShortIE(InfoExtractor):
+ _VALID_URL = r'https?://v\.triller\.co/(?P<id>\w+)'
+ _TESTS = [{
+ 'url': 'https://v.triller.co/WWZNWk',
+ 'md5': '5eb8dc2c971bd8cd794ec9e8d5e9d101',
+ 'info_dict': {
+ 'id': '66210052',
+ 'ext': 'mp4',
+ 'title': 'md5:2dfc89d154cd91a4a18cd9582ba03e16',
+ 'display_id': 'f4480e1f-fb4e-45b9-a44c-9e6c679ce7eb',
+ 'thumbnail': r're:^https://uploads\.cdn\.triller\.co/.+\.jpg$',
+ 'description': 'md5:2dfc89d154cd91a4a18cd9582ba03e16',
+ 'uploader': 'statefairent',
+ 'uploader_id': '487545193',
+ 'creator': 'Officialย Summerย Fairย ofย LA',
+ 'timestamp': 1629655457,
+ 'upload_date': '20210822',
+ 'duration': 19,
+ 'view_count': int,
+ 'like_count': int,
+ 'artist': 'Unknown',
+ 'track': 'Unknown',
+ 'uploader_url': 'https://triller.co/@statefairent',
+ 'comment_count': int,
+ },
+ }]
+
+ def _real_extract(self, url):
+ real_url = self._request_webpage(HEADRequest(url), self._match_id(url)).url
+ if self.suitable(real_url): # Prevent infinite loop in case redirect fails
+ raise UnsupportedError(real_url)
+ return self.url_result(real_url)