[cleanup, zingmp3] Refactor extractors

This commit is contained in:
pukkandan 2022-05-07 15:59:55 +05:30
parent a0fe51d562
commit 6b70527f9d
No known key found for this signature in database
GPG key ID: 7EEE9E1E817D0A39

View file

@ -14,139 +14,64 @@ from ..utils import (
class ZingMp3BaseIE(InfoExtractor): class ZingMp3BaseIE(InfoExtractor):
_VALID_URL_TMPL = r'https?://(?:mp3\.zing|zingmp3)\.vn/(?P<type>(?:%s))/[^/]+/(?P<id>\w+)(?:\.html|\?)' _VALID_URL_TMPL = r'https?://(?:mp3\.zing|zingmp3)\.vn/(?P<type>(?:%s))/[^/?#]+/(?P<id>\w+)(?:\.html|\?)'
_GEO_COUNTRIES = ['VN'] _GEO_COUNTRIES = ['VN']
_DOMAIN = 'https://zingmp3.vn' _DOMAIN = 'https://zingmp3.vn'
_SLUG_API = { _PER_PAGE = 50
# For audio/video _API_SLUGS = {
# Audio/video
'bai-hat': '/api/v2/page/get/song', 'bai-hat': '/api/v2/page/get/song',
'embed': '/api/v2/page/get/song', 'embed': '/api/v2/page/get/song',
'video-clip': '/api/v2/page/get/video', 'video-clip': '/api/v2/page/get/video',
'lyric': '/api/v2/lyric/get/lyric', 'lyric': '/api/v2/lyric/get/lyric',
'song_streaming': '/api/v2/song/get/streaming', 'song-streaming': '/api/v2/song/get/streaming',
# For playlist # Playlist
'playlist': '/api/v2/page/get/playlist', 'playlist': '/api/v2/page/get/playlist',
'album': '/api/v2/page/get/playlist', 'album': '/api/v2/page/get/playlist',
# For chart # Chart
'zing-chart': '/api/v2/page/get/chart-home', 'zing-chart': '/api/v2/page/get/chart-home',
'zing-chart-tuan': '/api/v2/page/get/week-chart', 'zing-chart-tuan': '/api/v2/page/get/week-chart',
'moi-phat-hanh': '/api/v2/page/get/newrelease-chart', 'moi-phat-hanh': '/api/v2/page/get/newrelease-chart',
'the-loai-video': '/api/v2/video/get/list', 'the-loai-video': '/api/v2/video/get/list',
# For user # User
'info-artist': '/api/v2/page/get/artist', 'info-artist': '/api/v2/page/get/artist',
'user-list-song': '/api/v2/song/get/list', 'user-list-song': '/api/v2/song/get/list',
'user-list-video': '/api/v2/video/get/list', 'user-list-video': '/api/v2/video/get/list',
} }
_PER_PAGE = 50
_API_KEY = '88265e23d4284f25963e6eedac8fbfa3'
_SECRET_KEY = b'2aa2d1c561e809b267f3638c4a307aab'
def _extract_item(self, item, song_id, type_url, fatal): def _api_url(self, url_type, params):
item_id = item.get('encodeId') or song_id api_slug = self._API_SLUGS[url_type]
title = item.get('title') or item.get('alias') params.update({'ctime': '1'})
sha256 = hashlib.sha256(
if type_url == 'video-clip': ''.join(f'{k}={v}' for k, v in sorted(params.items())).encode()).hexdigest()
info = self._download_json( data = {
'http://api.mp3.zing.vn/api/mobile/video/getvideoinfo', item_id, **params,
query={'requestdata': json.dumps({'id': item_id})}) 'apiKey': '88265e23d4284f25963e6eedac8fbfa3',
source = item.get('streaming') 'sig': hmac.new(
if info.get('source'): b'2aa2d1c561e809b267f3638c4a307aab', f'{api_slug}{sha256}'.encode(), hashlib.sha512).hexdigest(),
source['mp4'] = info.get('source')
else:
api = self.get_api_with_signature(name_api=self._SLUG_API.get('song_streaming'), param={'id': item_id})
source = self._download_json(api, video_id=item_id).get('data')
formats = []
for k, v in (source or {}).items():
if not v:
continue
if k in ('mp4', 'hls'):
for res, video_url in v.items():
if not video_url:
continue
if k == 'hls':
formats.extend(self._extract_m3u8_formats(
video_url, item_id, 'mp4',
'm3u8_native', m3u8_id=k, fatal=False))
elif k == 'mp4':
formats.append({
'format_id': 'mp4-' + res,
'url': video_url,
'height': int_or_none(res),
})
continue
elif v == 'VIP':
continue
formats.append({
'ext': 'mp3',
'format_id': k,
'tbr': int_or_none(k),
'url': self._proto_relative_url(v),
'vcodec': 'none',
})
if not formats:
if not fatal:
return
msg = item.get('msg')
if msg == 'Sorry, this content is not available in your country.':
self.raise_geo_restricted(countries=self._GEO_COUNTRIES, metadata_available=True)
self.raise_no_formats(msg, expected=True)
self._sort_formats(formats)
lyric = item.get('lyric')
if not lyric:
api = self.get_api_with_signature(name_api=self._SLUG_API.get("lyric"), param={'id': item_id})
info_lyric = self._download_json(api, video_id=item_id)
lyric = traverse_obj(info_lyric, ('data', 'file'))
subtitles = {
'origin': [{
'url': lyric,
}],
} if lyric else None
album = item.get('album') or {}
return {
'id': item_id,
'title': title,
'formats': formats,
'thumbnail': traverse_obj(item, 'thumbnail', 'thumbnailM'),
'subtitles': subtitles,
'duration': int_or_none(item.get('duration')),
'track': title,
'artist': traverse_obj(item, 'artistsNames', 'artists_names'),
'album': traverse_obj(album, 'name', 'title'),
'album_artist': traverse_obj(album, 'artistsNames', 'artists_names'),
} }
return f'{self._DOMAIN}{api_slug}?{urllib.parse.urlencode(data)}'
def _call_api(self, url_type, params, display_id=None, **kwargs):
resp = self._download_json(
self._api_url(url_type, params), display_id or params.get('id'),
note=f'Downloading {url_type} JSON metadata', **kwargs)
return (resp or {}).get('data') or {}
def _real_initialize(self): def _real_initialize(self):
if not self.get_param('cookiefile') and not self.get_param('cookiesfrombrowser'): if not self.get_param('cookiefile') and not self.get_param('cookiesfrombrowser'):
self._request_webpage(self.get_api_with_signature(name_api=self._SLUG_API['bai-hat'], param={'id': ''}), self._request_webpage(
None, note='Updating cookies') self._api_url('bai-hat', {'id': ''}), None, note='Updating cookies')
def _real_extract(self, url): def _parse_items(self, items):
song_id, type_url = self._match_valid_url(url).group('id', 'type') for url in traverse_obj(items, (..., 'link')) or []:
api = self.get_api_with_signature(name_api=self._SLUG_API[type_url], param={'id': song_id}) yield self.url_result(urljoin(self._DOMAIN, url))
return self._process_data(self._download_json(api, song_id)['data'], song_id, type_url)
def get_api_with_signature(self, name_api, param):
param.update({'ctime': '1'})
sha256 = hashlib.sha256(''.join(f'{i}={param[i]}' for i in sorted(param)).encode('utf-8')).hexdigest()
data = {
'apiKey': self._API_KEY,
'sig': hmac.new(self._SECRET_KEY, f'{name_api}{sha256}'.encode('utf-8'), hashlib.sha512).hexdigest(),
**param,
}
return f'{self._DOMAIN}{name_api}?{urllib.parse.urlencode(data)}'
def _entries(self, items):
for item in items or []:
if item and item.get('link'):
yield self.url_result(urljoin(self._DOMAIN, item['link']))
class ZingMp3IE(ZingMp3BaseIE): class ZingMp3IE(ZingMp3BaseIE):
_VALID_URL = ZingMp3BaseIE._VALID_URL_TMPL % 'bai-hat|video-clip|embed' _VALID_URL = ZingMp3BaseIE._VALID_URL_TMPL % 'bai-hat|video-clip|embed'
IE_NAME = 'zingmp3'
IE_DESC = 'zingmp3.vn'
_TESTS = [{ _TESTS = [{
'url': 'https://mp3.zing.vn/bai-hat/Xa-Mai-Xa-Bao-Thy/ZWZB9WAB.html', 'url': 'https://mp3.zing.vn/bai-hat/Xa-Mai-Xa-Bao-Thy/ZWZB9WAB.html',
'md5': 'ead7ae13693b3205cbc89536a077daed', 'md5': 'ead7ae13693b3205cbc89536a077daed',
@ -168,7 +93,7 @@ class ZingMp3IE(ZingMp3BaseIE):
}, },
}, { }, {
'url': 'https://zingmp3.vn/video-clip/Suong-Hoa-Dua-Loi-K-ICM-RYO/ZO8ZF7C7.html', 'url': 'https://zingmp3.vn/video-clip/Suong-Hoa-Dua-Loi-K-ICM-RYO/ZO8ZF7C7.html',
'md5': 'c7f23d971ac1a4f675456ed13c9b9612', 'md5': '3c2081e79471a2f4a3edd90b70b185ea',
'info_dict': { 'info_dict': {
'id': 'ZO8ZF7C7', 'id': 'ZO8ZF7C7',
'title': 'Sương Hoa Đưa Lối', 'title': 'Sương Hoa Đưa Lối',
@ -201,11 +126,64 @@ class ZingMp3IE(ZingMp3BaseIE):
'url': 'https://zingmp3.vn/bai-hat/Xa-Mai-Xa-Bao-Thy/ZWZB9WAB.html', 'url': 'https://zingmp3.vn/bai-hat/Xa-Mai-Xa-Bao-Thy/ZWZB9WAB.html',
'only_matching': True, 'only_matching': True,
}] }]
IE_NAME = 'zingmp3'
IE_DESC = 'zingmp3.vn'
def _process_data(self, data, song_id, type_url): def _real_extract(self, url):
return self._extract_item(data, song_id, type_url, True) song_id, url_type = self._match_valid_url(url).group('id', 'type')
item = self._call_api(url_type, {'id': song_id})
item_id = item.get('encodeId') or song_id
if url_type == 'video-clip':
source = item.get('streaming')
source['mp4'] = self._download_json(
'http://api.mp3.zing.vn/api/mobile/video/getvideoinfo', item_id,
query={'requestdata': json.dumps({'id': item_id})},
note='Downloading mp4 JSON metadata').get('source')
else:
source = self._call_api('song-streaming', {'id': item_id})
formats = []
for k, v in (source or {}).items():
if not v or v == 'VIP':
continue
if k not in ('mp4', 'hls'):
formats.append({
'ext': 'mp3',
'format_id': k,
'tbr': int_or_none(k),
'url': self._proto_relative_url(v),
'vcodec': 'none',
})
continue
for res, video_url in v.items():
if not video_url:
continue
if k == 'hls':
formats.extend(self._extract_m3u8_formats(video_url, item_id, 'mp4', m3u8_id=k, fatal=False))
continue
formats.append({
'format_id': f'mp4-{res}',
'url': video_url,
'height': int_or_none(res),
})
if not formats and item.get('msg') == 'Sorry, this content is not available in your country.':
self.raise_geo_restricted(countries=self._GEO_COUNTRIES, metadata_available=True)
self._sort_formats(formats)
lyric = item.get('lyric') or self._call_api('lyric', {'id': item_id}, fatal=False).get('file')
return {
'id': item_id,
'title': traverse_obj(item, 'title', 'alias'),
'thumbnail': traverse_obj(item, 'thumbnail', 'thumbnailM'),
'duration': int_or_none(item.get('duration')),
'track': traverse_obj(item, 'title', 'alias'),
'artist': traverse_obj(item, 'artistsNames', 'artists_names'),
'album': traverse_obj(item, ('album', ('name', 'title')), get_all=False),
'album_artist': traverse_obj(item, ('album', ('artistsNames', 'artists_names')), get_all=False),
'formats': formats,
'subtitles': {'origin': [{'url': lyric}]} if lyric else None,
}
class ZingMp3AlbumIE(ZingMp3BaseIE): class ZingMp3AlbumIE(ZingMp3BaseIE):
@ -233,10 +211,12 @@ class ZingMp3AlbumIE(ZingMp3BaseIE):
}] }]
IE_NAME = 'zingmp3:album' IE_NAME = 'zingmp3:album'
def _process_data(self, data, song_id, type_url): def _real_extract(self, url):
items = traverse_obj(data, ('song', 'items')) or [] song_id, url_type = self._match_valid_url(url).group('id', 'type')
return self.playlist_result(self._entries(items), traverse_obj(data, 'id', 'encodeId'), data = self._call_api(url_type, {'id': song_id})
traverse_obj(data, 'name', 'title')) return self.playlist_result(
self._parse_items(traverse_obj(data, ('song', 'items'))),
traverse_obj(data, 'id', 'encodeId'), traverse_obj(data, 'name', 'title'))
class ZingMp3ChartHomeIE(ZingMp3BaseIE): class ZingMp3ChartHomeIE(ZingMp3BaseIE):
@ -245,34 +225,26 @@ class ZingMp3ChartHomeIE(ZingMp3BaseIE):
'url': 'https://zingmp3.vn/zing-chart', 'url': 'https://zingmp3.vn/zing-chart',
'info_dict': { 'info_dict': {
'id': 'zing-chart', 'id': 'zing-chart',
'title': 'zing-chart',
}, },
'playlist_mincount': 100, 'playlist_mincount': 100,
}, { }, {
'url': 'https://zingmp3.vn/moi-phat-hanh', 'url': 'https://zingmp3.vn/moi-phat-hanh',
'info_dict': { 'info_dict': {
'id': 'moi-phat-hanh', 'id': 'moi-phat-hanh',
'title': 'moi-phat-hanh',
}, },
'playlist_mincount': 100, 'playlist_mincount': 100,
}] }]
IE_NAME = 'zingmp3:chart-home' IE_NAME = 'zingmp3:chart-home'
def _real_extract(self, url): def _real_extract(self, url):
type_url = self._match_id(url) url_type = self._match_id(url)
api = self.get_api_with_signature(name_api=self._SLUG_API[type_url], param={'id': type_url}) data = self._call_api(url_type, {'id': url_type})
return self._process_data(self._download_json(api, type_url)['data'], type_url, type_url) items = traverse_obj(data, ('RTChart', 'items') if url_type == 'zing-chart' else 'items')
return self.playlist_result(self._parse_items(items), url_type)
def _process_data(self, data, chart_id, type_url):
if type_url == 'zing-chart':
items = traverse_obj(data, ('RTChart', 'items'), default=[])
else:
items = data.get('items')
return self.playlist_result(self._entries(items), type_url, type_url)
class ZingMp3WeekChartIE(ZingMp3BaseIE): class ZingMp3WeekChartIE(ZingMp3BaseIE):
_VALID_URL = r'https?://(?:mp3\.zing|zingmp3)\.vn/(?P<type>zing-chart-tuan)/[^/?#]+/(?P<id>\w+)' _VALID_URL = ZingMp3BaseIE._VALID_URL_TMPL % 'zing-chart-tuan'
IE_NAME = 'zingmp3:week-chart' IE_NAME = 'zingmp3:week-chart'
_TESTS = [{ _TESTS = [{
'url': 'https://zingmp3.vn/zing-chart-tuan/Bai-hat-Viet-Nam/IWZ9Z08I.html', 'url': 'https://zingmp3.vn/zing-chart-tuan/Bai-hat-Viet-Nam/IWZ9Z08I.html',
@ -297,8 +269,11 @@ class ZingMp3WeekChartIE(ZingMp3BaseIE):
'playlist_mincount': 10, 'playlist_mincount': 10,
}] }]
def _process_data(self, data, chart_id, type_url): def _real_extract(self, url):
return self.playlist_result(self._entries(data['items']), chart_id, f'zing-chart-{data.get("country", "")}') song_id, url_type = self._match_valid_url(url).group('id', 'type')
data = self._call_api(url_type, {'id': song_id})
return self.playlist_result(
self._parse_items(data['items']), song_id, f'zing-chart-{data.get("country", "")}')
class ZingMp3ChartMusicVideoIE(ZingMp3BaseIE): class ZingMp3ChartMusicVideoIE(ZingMp3BaseIE):
@ -334,33 +309,23 @@ class ZingMp3ChartMusicVideoIE(ZingMp3BaseIE):
'playlist_mincount': 10, 'playlist_mincount': 10,
}] }]
def _fetch_page(self, song_id, type_url, page): def _fetch_page(self, song_id, url_type, page):
page += 1 return self._parse_items(self._call_api(url_type, {
api = self.get_api_with_signature(name_api=self._SLUG_API[type_url], param={
'id': song_id, 'id': song_id,
'type': 'genre', 'type': 'genre',
'page': page, 'page': page + 1,
'count': self._PER_PAGE 'count': self._PER_PAGE
}) }).get('items'))
data = self._download_json(api, song_id)['data']
return self._entries(data.get('items'))
def _real_extract(self, url): def _real_extract(self, url):
song_id, regions, type_url = self._match_valid_url(url).group('id', 'regions', 'type') song_id, regions, url_type = self._match_valid_url(url).group('id', 'regions', 'type')
entries = OnDemandPagedList(functools.partial(self._fetch_page, song_id, type_url), self._PER_PAGE) return self.playlist_result(
return self.playlist_result(entries, song_id, f'{type_url}_{regions}') OnDemandPagedList(functools.partial(self._fetch_page, song_id, url_type), self._PER_PAGE),
song_id, f'{url_type}_{regions}')
class ZingMp3UserIE(ZingMp3BaseIE): class ZingMp3UserIE(ZingMp3BaseIE):
_VALID_URL = r'''(?x) _VALID_URL = r'https?://(?:mp3\.zing|zingmp3)\.vn/(?P<user>[^/]+)/(?P<type>bai-hat|single|album|video)/?(?:[?#]|$)'
https?://
(?:mp3\.zing|zingmp3)\.vn/
(?P<user>[^/]+)
(?:
/(?P<type>bai-hat|single|album|video)
)
/?(?:[?#]|$)
'''
IE_NAME = 'zingmp3:user' IE_NAME = 'zingmp3:user'
_TESTS = [{ _TESTS = [{
'url': 'https://zingmp3.vn/Mr-Siro/bai-hat', 'url': 'https://zingmp3.vn/Mr-Siro/bai-hat',
@ -396,36 +361,26 @@ class ZingMp3UserIE(ZingMp3BaseIE):
'playlist_mincount': 15, 'playlist_mincount': 15,
}] }]
def _fetch_page(self, user_id, type_url, page): def _fetch_page(self, user_id, url_type, page):
page += 1 url_type = 'user-list-song' if url_type == 'bai-hat' else 'user-list-video'
name_api = self._SLUG_API['user-list-song'] if type_url == 'bai-hat' else self._SLUG_API['user-list-video'] return self._parse_items(self._call_api(url_type, {
api = self.get_api_with_signature(name_api=name_api, param={
'id': user_id, 'id': user_id,
'type': 'artist', 'type': 'artist',
'page': page, 'page': page + 1,
'count': self._PER_PAGE 'count': self._PER_PAGE
}) }, query={'sort': 'new', 'sectionId': 'aSong'}).get('items'))
data = self._download_json(api, user_id, query={'sort': 'new', 'sectionId': 'aSong'})['data']
return self._entries(data.get('items'))
def _real_extract(self, url): def _real_extract(self, url):
user_alias, type_url = self._match_valid_url(url).group('user', 'type') user_alias, url_type = self._match_valid_url(url).group('user', 'type')
if not type_url: if not url_type:
type_url = 'bai-hat' url_type = 'bai-hat'
user_info = self._download_json(
self.get_api_with_signature(name_api=self._SLUG_API['info-artist'], param={}), user_info = self._call_api('info-artist', {}, user_alias, query={'alias': user_alias})
video_id=user_alias, query={'alias': user_alias})['data'] if url_type in ('bai-hat', 'video'):
user_id = user_info.get('id') entries = OnDemandPagedList(
biography = user_info.get('biography') functools.partial(self._fetch_page, user_info['id'], url_type), self._PER_PAGE)
if type_url == 'bai-hat' or type_url == 'video':
entries = OnDemandPagedList(functools.partial(self._fetch_page, user_id, type_url), self._PER_PAGE)
return self.playlist_result(entries, user_id, f'{user_info.get("name")} - {type_url}', biography)
else: else:
entries = [] entries = self._parse_items(traverse_obj(user_info, (
for section in user_info.get('sections', {}): 'sections', lambda _, v: v['link'] == f'/{user_alias}/{url_type}', 'items', ...)))
if section.get('link') == f'/{user_alias}/{type_url}': return self.playlist_result(
items = section.get('items') entries, user_info['id'], f'{user_info.get("name")} - {url_type}', user_info.get('biography'))
for item in items:
entries.append(self.url_result(urljoin(self._DOMAIN, item.get('link'))))
break
return self.playlist_result(entries, user_id, f'{user_info.get("name")} - {type_url}', biography)