From 0fd6661edb7e671eb7b131de12fa89bb85a6cbaf Mon Sep 17 00:00:00 2001 From: MinePlayersPE Date: Sun, 12 Sep 2021 13:21:59 +0700 Subject: [PATCH] [TikTokUser] Fix extractor using mobile API (#925) and misc cleanup Closes #859 Authored by: MinePlayersPE, llacb47 --- yt_dlp/extractor/tiktok.py | 417 +++++++++++++++++++++---------------- 1 file changed, 232 insertions(+), 185 deletions(-) diff --git a/yt_dlp/extractor/tiktok.py b/yt_dlp/extractor/tiktok.py index 953ff05b6..4b0efd4a3 100644 --- a/yt_dlp/extractor/tiktok.py +++ b/yt_dlp/extractor/tiktok.py @@ -5,6 +5,7 @@ import itertools import random import string import time +import json from .common import InfoExtractor from ..utils import ( @@ -17,7 +18,189 @@ from ..utils import ( ) -class TikTokIE(InfoExtractor): +class TikTokBaseIE(InfoExtractor): + _APP_VERSION = '20.9.3' + _MANIFEST_APP_VERSION = '291' + QUALITIES = ('360p', '540p', '720p') + + def _call_api(self, ep, query, video_id, fatal=True, + note='Downloading API JSON', errnote='Unable to download API page'): + real_query = { + **query, + 'version_name': self._APP_VERSION, + 'version_code': self._MANIFEST_APP_VERSION, + 'build_number': self._APP_VERSION, + 'manifest_version_code': self._MANIFEST_APP_VERSION, + 'update_version_code': self._MANIFEST_APP_VERSION, + 'openudid': ''.join(random.choice('0123456789abcdef') for i in range(16)), + 'uuid': ''.join([random.choice(string.digits) for num in range(16)]), + '_rticket': int(time.time() * 1000), + 'ts': int(time.time()), + 'device_brand': 'Google', + 'device_type': 'Pixel 4', + 'device_platform': 'android', + 'resolution': '1080*1920', + 'dpi': 420, + 'os_version': '10', + 'os_api': '29', + 'carrier_region': 'US', + 'sys_region': 'US', + 'region': 'US', + 'app_name': 'trill', + 'app_language': 'en', + 'language': 'en', + 'timezone_name': 'America/New_York', + 'timezone_offset': '-14400', + 'channel': 'googleplay', + 'ac': 'wifi', + 'mcc_mnc': '310260', + 'is_my_cn': 0, + 'aid': 1180, + 'ssmix': 'a', + 'as': 'a1qwert123', + 'cp': 'cbfhckdckkde1', + } + self._set_cookie('.tiktokv.com', 'odin_tt', ''.join(random.choice('0123456789abcdef') for i in range(160))) + return self._download_json( + 'https://api-t2.tiktokv.com/aweme/v1/%s/' % ep, video_id=video_id, + fatal=fatal, note=note, errnote=errnote, headers={ + 'User-Agent': f'com.ss.android.ugc.trill/{self._MANIFEST_APP_VERSION} (Linux; U; Android 10; en_US; Pixel 4; Build/QQ3A.200805.001; Cronet/58.0.2991.0)', + 'Accept': 'application/json', + }, query=real_query) + + def _parse_aweme_video(self, aweme_detail): + aweme_id = aweme_detail['aweme_id'] + video_info = aweme_detail['video'] + + def parse_url_key(url_key): + format_id, codec, res, bitrate = self._search_regex( + r'v[^_]+_(?P(?P[^_]+)_(?P\d+p)_(?P\d+))', url_key, + 'url key', default=(None, None, None, None), group=('id', 'codec', 'res', 'bitrate')) + if not format_id: + return {}, None + return { + 'format_id': format_id, + 'vcodec': 'h265' if codec == 'bytevc1' else codec, + 'tbr': int_or_none(bitrate, scale=1000) or None, + 'quality': qualities(self.QUALITIES)(res), + }, res + + known_resolutions = {} + + def extract_addr(addr, add_meta={}): + parsed_meta, res = parse_url_key(addr.get('url_key', '')) + if res: + known_resolutions.setdefault(res, {}).setdefault('height', add_meta.get('height')) + known_resolutions[res].setdefault('width', add_meta.get('width')) + parsed_meta.update(known_resolutions.get(res, {})) + add_meta.setdefault('height', int_or_none(res[:-1])) + return [{ + 'url': url, + 'filesize': int_or_none(addr.get('data_size')), + 'ext': 'mp4', + 'acodec': 'aac', + 'source_preference': -2 if 'aweme/v1' in url else -1, # Downloads from API might get blocked + **add_meta, **parsed_meta, + 'format_note': ' '.join(filter(None, ( + add_meta.get('format_note'), '(API)' if 'aweme/v1' in url else ''))) + } for url in addr.get('url_list') or []] + + # Hack: Add direct video links first to prioritize them when removing duplicate formats + formats = [] + if video_info.get('play_addr'): + formats.extend(extract_addr(video_info['play_addr'], { + 'format_id': 'play_addr', + 'format_note': 'Direct video', + 'vcodec': 'h265' if traverse_obj( + video_info, 'is_bytevc1', 'is_h265') else 'h264', # Always h264? + 'width': video_info.get('width'), + 'height': video_info.get('height'), + })) + if video_info.get('download_addr'): + formats.extend(extract_addr(video_info['download_addr'], { + 'format_id': 'download_addr', + 'format_note': 'Download video%s' % (', watermarked' if video_info.get('has_watermark') else ''), + 'vcodec': 'h264', + 'width': video_info.get('width'), + 'height': video_info.get('height'), + 'preference': -2 if video_info.get('has_watermark') else -1, + })) + if video_info.get('play_addr_h264'): + formats.extend(extract_addr(video_info['play_addr_h264'], { + 'format_id': 'play_addr_h264', + 'format_note': 'Direct video', + 'vcodec': 'h264', + })) + if video_info.get('play_addr_bytevc1'): + formats.extend(extract_addr(video_info['play_addr_bytevc1'], { + 'format_id': 'play_addr_bytevc1', + 'format_note': 'Direct video', + 'vcodec': 'h265', + })) + + for bitrate in video_info.get('bit_rate', []): + if bitrate.get('play_addr'): + formats.extend(extract_addr(bitrate['play_addr'], { + 'format_id': bitrate.get('gear_name'), + 'format_note': 'Playback video', + 'tbr': try_get(bitrate, lambda x: x['bit_rate'] / 1000), + 'vcodec': 'h265' if traverse_obj( + bitrate, 'is_bytevc1', 'is_h265') else 'h264', + })) + + self._remove_duplicate_formats(formats) + self._sort_formats(formats, ('quality', 'codec', 'size', 'br')) + + thumbnails = [] + for cover_id in ('cover', 'ai_dynamic_cover', 'animated_cover', 'ai_dynamic_cover_bak', + 'origin_cover', 'dynamic_cover'): + cover = video_info.get(cover_id) + if cover: + for cover_url in cover['url_list']: + thumbnails.append({ + 'id': cover_id, + 'url': cover_url, + }) + + stats_info = aweme_detail.get('statistics', {}) + author_info = aweme_detail.get('author', {}) + music_info = aweme_detail.get('music', {}) + user_id = str_or_none(author_info.get('nickname')) + + contained_music_track = traverse_obj( + music_info, ('matched_song', 'title'), ('matched_pgc_sound', 'title'), expected_type=str) + contained_music_author = traverse_obj( + music_info, ('matched_song', 'author'), ('matched_pgc_sound', 'author'), 'author', expected_type=str) + + is_generic_og_trackname = music_info.get('is_original_sound') and music_info.get('title') == 'original sound - %s' % music_info.get('owner_handle') + if is_generic_og_trackname: + music_track, music_author = contained_music_track or 'original sound', contained_music_author + else: + music_track, music_author = music_info.get('title'), music_info.get('author') + + return { + 'id': aweme_id, + 'title': aweme_detail['desc'], + 'description': aweme_detail['desc'], + 'view_count': int_or_none(stats_info.get('play_count')), + 'like_count': int_or_none(stats_info.get('digg_count')), + 'repost_count': int_or_none(stats_info.get('share_count')), + 'comment_count': int_or_none(stats_info.get('comment_count')), + 'uploader': str_or_none(author_info.get('unique_id')), + 'creator': user_id, + 'uploader_id': str_or_none(author_info.get('uid')), + 'uploader_url': f'https://www.tiktok.com/@{user_id}' if user_id else None, + 'track': music_track, + 'album': str_or_none(music_info.get('album')) or None, + 'artist': music_author, + 'timestamp': int_or_none(aweme_detail.get('create_time')), + 'formats': formats, + 'thumbnails': thumbnails, + 'duration': int_or_none(traverse_obj(video_info, 'duration', ('download_addr', 'duration')), scale=1000) + } + + +class TikTokIE(TikTokBaseIE): _VALID_URL = r'https?://www\.tiktok\.com/@[\w\.-]+/video/(?P\d+)' _TESTS = [{ @@ -71,9 +254,6 @@ class TikTokIE(InfoExtractor): 'url': 'https://www.tiktok.com/@MS4wLjABAAAAAR29F6J2Ktu0Daw03BJyXPNoRQ-W7U5a0Mn3lVCq2rQhjOd_WNLclHUoFgwX8Eno/video/6932675057474981122', 'only_matching': True, }] - _APP_VERSION = '20.9.3' - _MANIFEST_APP_VERSION = '291' - QUALITIES = ('360p', '540p', '720p') def _extract_aweme(self, props_data, webpage, url): video_info = try_get( @@ -126,175 +306,9 @@ class TikTokIE(InfoExtractor): } def _extract_aweme_app(self, aweme_id): - query = { - 'aweme_id': aweme_id, - 'version_name': self._APP_VERSION, - 'version_code': self._MANIFEST_APP_VERSION, - 'build_number': self._APP_VERSION, - 'manifest_version_code': self._MANIFEST_APP_VERSION, - 'update_version_code': self._MANIFEST_APP_VERSION, - 'openudid': ''.join(random.choice('0123456789abcdef') for i in range(16)), - 'uuid': ''.join([random.choice(string.digits) for num in range(16)]), - '_rticket': int(time.time() * 1000), - 'ts': int(time.time()), - 'device_brand': 'Google', - 'device_type': 'Pixel 4', - 'device_platform': 'android', - 'resolution': '1080*1920', - 'dpi': 420, - 'os_version': '10', - 'os_api': '29', - 'carrier_region': 'US', - 'sys_region': 'US', - 'region': 'US', - 'app_name': 'trill', - 'app_language': 'en', - 'language': 'en', - 'timezone_name': 'America/New_York', - 'timezone_offset': '-14400', - 'channel': 'googleplay', - 'ac': 'wifi', - 'mcc_mnc': '310260', - 'is_my_cn': 0, - 'aid': 1180, - 'ssmix': 'a', - 'as': 'a1qwert123', - 'cp': 'cbfhckdckkde1', - } - - self._set_cookie('.tiktokv.com', 'odin_tt', ''.join(random.choice('0123456789abcdef') for i in range(160))) - - aweme_detail = self._download_json( - 'https://api-t2.tiktokv.com/aweme/v1/aweme/detail/', aweme_id, - 'Downloading video details', 'Unable to download video details', - headers={ - 'User-Agent': f'com.ss.android.ugc.trill/{self._MANIFEST_APP_VERSION} (Linux; U; Android 10; en_US; Pixel 4; Build/QQ3A.200805.001; Cronet/58.0.2991.0)', - }, query=query)['aweme_detail'] - video_info = aweme_detail['video'] - - def parse_url_key(url_key): - format_id, codec, res, bitrate = self._search_regex( - r'v[^_]+_(?P(?P[^_]+)_(?P\d+p)_(?P\d+))', url_key, - 'url key', default=(None, None, None, None), group=('id', 'codec', 'res', 'bitrate')) - if not format_id: - return {}, None - return { - 'format_id': format_id, - 'vcodec': 'h265' if codec == 'bytevc1' else codec, - 'tbr': int_or_none(bitrate, scale=1000) or None, - 'quality': qualities(self.QUALITIES)(res), - }, res - - known_resolutions = {} - - def extract_addr(addr, add_meta={}): - parsed_meta, res = parse_url_key(addr.get('url_key', '')) - if res: - known_resolutions.setdefault(res, {}).setdefault('height', add_meta.get('height')) - known_resolutions[res].setdefault('width', add_meta.get('width')) - parsed_meta.update(known_resolutions.get(res, {})) - add_meta.setdefault('height', int_or_none(res[:-1])) - return [{ - 'url': url, - 'filesize': int_or_none(addr.get('data_size')), - 'ext': 'mp4', - 'acodec': 'aac', - **add_meta, **parsed_meta - } for url in addr.get('url_list') or []] - - # Hack: Add direct video links first to prioritize them when removing duplicate formats - formats = [] - if video_info.get('play_addr'): - formats.extend(extract_addr(video_info['play_addr'], { - 'format_id': 'play_addr', - 'format_note': 'Direct video', - 'vcodec': 'h265' if traverse_obj( - video_info, 'is_bytevc1', 'is_h265') else 'h264', # Always h264? - 'width': video_info.get('width'), - 'height': video_info.get('height'), - })) - if video_info.get('download_addr'): - formats.extend(extract_addr(video_info['download_addr'], { - 'format_id': 'download_addr', - 'format_note': 'Download video%s' % (', watermarked' if video_info.get('has_watermark') else ''), - 'vcodec': 'h264', - 'width': video_info.get('width'), - 'height': video_info.get('height'), - 'source_preference': -2 if video_info.get('has_watermark') else -1, - })) - if video_info.get('play_addr_h264'): - formats.extend(extract_addr(video_info['play_addr_h264'], { - 'format_id': 'play_addr_h264', - 'format_note': 'Direct video', - 'vcodec': 'h264', - })) - if video_info.get('play_addr_bytevc1'): - formats.extend(extract_addr(video_info['play_addr_bytevc1'], { - 'format_id': 'play_addr_bytevc1', - 'format_note': 'Direct video', - 'vcodec': 'h265', - })) - - for bitrate in video_info.get('bit_rate', []): - if bitrate.get('play_addr'): - formats.extend(extract_addr(bitrate['play_addr'], { - 'format_id': bitrate.get('gear_name'), - 'format_note': 'Playback video', - 'tbr': try_get(bitrate, lambda x: x['bit_rate'] / 1000), - 'vcodec': 'h265' if traverse_obj( - bitrate, 'is_bytevc1', 'is_h265') else 'h264', - })) - - self._remove_duplicate_formats(formats) - self._sort_formats(formats, ('quality', 'source', 'codec', 'size', 'br')) - - thumbnails = [] - for cover_id in ('cover', 'ai_dynamic_cover', 'animated_cover', 'ai_dynamic_cover_bak', - 'origin_cover', 'dynamic_cover'): - cover = video_info.get(cover_id) - if cover: - for cover_url in cover['url_list']: - thumbnails.append({ - 'id': cover_id, - 'url': cover_url, - }) - - stats_info = aweme_detail.get('statistics', {}) - author_info = aweme_detail.get('author', {}) - music_info = aweme_detail.get('music', {}) - user_id = str_or_none(author_info.get('nickname')) - - contained_music_track = traverse_obj( - music_info, ('matched_song', 'title'), ('matched_pgc_sound', 'title'), expected_type=str) - contained_music_author = traverse_obj( - music_info, ('matched_song', 'author'), ('matched_pgc_sound', 'author'), 'author', expected_type=str) - - is_generic_og_trackname = music_info.get('is_original_sound') and music_info.get('title') == 'original sound - %s' % music_info.get('owner_handle') - if is_generic_og_trackname: - music_track, music_author = contained_music_track or 'original sound', contained_music_author - else: - music_track, music_author = music_info.get('title'), music_info.get('author') - - return { - 'id': aweme_id, - 'title': aweme_detail['desc'], - 'description': aweme_detail['desc'], - 'view_count': int_or_none(stats_info.get('play_count')), - 'like_count': int_or_none(stats_info.get('digg_count')), - 'repost_count': int_or_none(stats_info.get('share_count')), - 'comment_count': int_or_none(stats_info.get('comment_count')), - 'uploader': str_or_none(author_info.get('unique_id')), - 'creator': user_id, - 'uploader_id': str_or_none(author_info.get('uid')), - 'uploader_url': f'https://www.tiktok.com/@{user_id}' if user_id else None, - 'track': music_track, - 'album': str_or_none(music_info.get('album')) or None, - 'artist': music_author, - 'timestamp': int_or_none(aweme_detail.get('create_time')), - 'formats': formats, - 'thumbnails': thumbnails, - 'duration': int_or_none(traverse_obj(video_info, 'duration', ('download_addr', 'duration')), scale=1000) - } + aweme_detail = self._call_api('aweme/detail', {'aweme_id': aweme_id}, aweme_id, + note='Downloading video details', errnote='Unable to download video details')['aweme_detail'] + return self._parse_aweme_video(aweme_detail) def _real_extract(self, url): video_id = self._match_id(url) @@ -323,38 +337,33 @@ class TikTokIE(InfoExtractor): raise ExtractorError('Video not available', video_id=video_id) -class TikTokUserIE(InfoExtractor): +class TikTokUserIE(TikTokBaseIE): IE_NAME = 'tiktok:user' - _VALID_URL = r'https?://(?:www\.)?tiktok\.com/@(?P[\w\._]+)/?(?:$|[#?])' + _VALID_URL = r'https?://(?:www\.)?tiktok\.com/@(?P[\w\.-]+)/?(?:$|[#?])' _TESTS = [{ 'url': 'https://tiktok.com/@corgibobaa?lang=en', 'playlist_mincount': 45, 'info_dict': { 'id': '6935371178089399301', }, - 'skip': 'Cookies (not necessarily logged in) are needed.' }, { 'url': 'https://www.tiktok.com/@meme', 'playlist_mincount': 593, 'info_dict': { 'id': '79005827461758976', }, - 'skip': 'Cookies (not necessarily logged in) are needed.' }] - def _entries(self, url, user_id): - webpage = self._download_webpage(url, user_id) - own_id = self._search_regex(r'\"id\":\"(?P\d+)', webpage, user_id, default=None) - if not own_id: - raise ExtractorError('Cookies (not necessarily logged in) are needed.', expected=True) - secuid = self._search_regex(r'\"secUid\":\"(?P[^\"]+)', webpage, user_id) + r''' # TODO: Fix by adding _signature to api_url + def _entries(self, webpage, user_id, username): + secuid = self._search_regex(r'\"secUid\":\"(?P[^\"]+)', webpage, username) verifyfp_cookie = self._get_cookies('https://www.tiktok.com').get('s_v_web_id') if not verifyfp_cookie: raise ExtractorError('Improper cookies (missing s_v_web_id).', expected=True) api_url = f'https://m.tiktok.com/api/post/item_list/?aid=1988&cookie_enabled=true&count=30&verifyFp={verifyfp_cookie.value}&secUid={secuid}&cursor=' cursor = '0' for page in itertools.count(): - data_json = self._download_json(api_url + cursor, user_id, note='Downloading Page %d' % page) + data_json = self._download_json(api_url + cursor, username, note='Downloading Page %d' % page) for video in data_json.get('itemList', []): video_id = video['id'] video_url = f'https://www.tiktok.com/@{user_id}/video/{video_id}' @@ -362,7 +371,45 @@ class TikTokUserIE(InfoExtractor): if not data_json.get('hasMore'): break cursor = data_json['cursor'] + ''' + + def _entries_api(self, webpage, user_id, username): + query = { + 'user_id': user_id, + 'count': 21, + 'max_cursor': 0, + 'min_cursor': 0, + 'retry_type': 'no_retry', + 'device_id': ''.join(random.choice(string.digits) for i in range(19)), # Some endpoints don't like randomized device_id, so it isn't directly set in _call_api. + } + + max_retries = self.get_param('extractor_retries', 3) + for page in itertools.count(1): + for retries in itertools.count(): + try: + post_list = self._call_api('aweme/post', query, username, + note='Downloading user video list page %d%s' % (page, f' (attempt {retries})' if retries != 0 else ''), + errnote='Unable to download user video list') + except ExtractorError as e: + if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0 and retries != max_retries: + self.report_warning('%s. Retrying...' % str(e.cause or e.msg)) + continue + raise + break + for video in post_list.get('aweme_list', []): + yield { + **self._parse_aweme_video(video), + 'ie_key': TikTokIE.ie_key(), + 'extractor': 'TikTok', + } + if not post_list.get('has_more'): + break + query['max_cursor'] = post_list['max_cursor'] def _real_extract(self, url): user_id = self._match_id(url) - return self.playlist_result(self._entries(url, user_id), user_id) + webpage = self._download_webpage(url, user_id, headers={ + 'User-Agent': 'facebookexternalhit/1.1 (+http://www.facebook.com/externalhit_uatext.php)' + }) + own_id = self._html_search_regex(r'snssdk\d*://user/profile/(\d+)', webpage, 'user ID') + return self.playlist_result(self._entries_api(webpage, own_id, user_id), user_id)