aboutsummaryrefslogtreecommitdiffstats
path: root/hypervideo_dl/extractor/huya.py
blob: 4e96f22faa372581c930b23f8cd0e619ef4d5c8a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# coding: utf-8
from __future__ import unicode_literals

import hashlib
import random

from ..compat import compat_urlparse, compat_b64decode

from ..utils import (
    ExtractorError,
    int_or_none,
    js_to_json,
    str_or_none,
    try_get,
    unescapeHTML,
    update_url_query,
)

from .common import InfoExtractor


class HuyaLiveIE(InfoExtractor):
    _VALID_URL = r'https?://(?:www\.|m\.)?huya\.com/(?P<id>[^/#?&]+)(?:\D|$)'
    IE_NAME = 'huya:live'
    IE_DESC = 'huya.com'
    TESTS = [{
        'url': 'https://www.huya.com/572329',
        'info_dict': {
            'id': '572329',
            'title': str,
            'description': str,
            'is_live': True,
            'view_count': int,
        },
        'params': {
            'skip_download': True,
        },
    }, {
        'url': 'https://www.huya.com/xiaoyugame',
        'only_matching': True
    }]

    _RESOLUTION = {
        '蓝光4M': {
            'width': 1920,
            'height': 1080,
        },
        '超清': {
            'width': 1280,
            'height': 720,
        },
        '流畅': {
            'width': 800,
            'height': 480
        }
    }

    def _real_extract(self, url):
        video_id = self._match_id(url)
        webpage = self._download_webpage(url, video_id=video_id)
        json_stream = self._search_regex(r'"stream":\s+"([a-zA-Z0-9+=/]+)"', webpage, 'stream', default=None)
        if not json_stream:
            raise ExtractorError('Video is offline', expected=True)
        stream_data = self._parse_json(compat_b64decode(json_stream).decode(), video_id=video_id,
                                       transform_source=js_to_json)
        room_info = try_get(stream_data, lambda x: x['data'][0]['gameLiveInfo'])
        if not room_info:
            raise ExtractorError('Can not extract the room info', expected=True)
        title = room_info.get('roomName') or room_info.get('introduction') or self._html_extract_title(webpage)
        screen_type = room_info.get('screenType')
        live_source_type = room_info.get('liveSourceType')
        stream_info_list = stream_data['data'][0]['gameStreamInfoList']
        formats = []
        for stream_info in stream_info_list:
            stream_url = stream_info.get('sFlvUrl')
            if not stream_url:
                continue
            stream_name = stream_info.get('sStreamName')
            re_secret = not screen_type and live_source_type in (0, 8, 13)
            params = dict(compat_urlparse.parse_qsl(unescapeHTML(stream_info['sFlvAntiCode'])))
            fm, ss = '', ''
            if re_secret:
                fm, ss = self.encrypt(params, stream_info, stream_name)
            for si in stream_data.get('vMultiStreamInfo'):
                rate = si.get('iBitRate')
                if rate:
                    params['ratio'] = rate
                else:
                    params.pop('ratio', None)
                if re_secret:
                    params['wsSecret'] = hashlib.md5(
                        '_'.join([fm, params['u'], stream_name, ss, params['wsTime']]))
                formats.append({
                    'ext': stream_info.get('sFlvUrlSuffix'),
                    'format_id': str_or_none(stream_info.get('iLineIndex')),
                    'tbr': rate,
                    'url': update_url_query(f'{stream_url}/{stream_name}.{stream_info.get("sFlvUrlSuffix")}',
                                            query=params),
                    **self._RESOLUTION.get(si.get('sDisplayName'), {}),
                })

        self._sort_formats(formats)

        return {
            'id': video_id,
            'title': title,
            'formats': formats,
            'view_count': room_info.get('totalCount'),
            'thumbnail': room_info.get('screenshot'),
            'description': room_info.get('contentIntro'),
            'http_headers': {
                'Origin': 'https://www.huya.com',
                'Referer': 'https://www.huya.com/',
            },
        }

    def encrypt(self, params, stream_info, stream_name):
        ct = int_or_none(params.get('wsTime'), 16) + random.random()
        presenter_uid = stream_info['lPresenterUid']
        if not stream_name.startswith(str(presenter_uid)):
            uid = presenter_uid
        else:
            uid = int_or_none(ct % 1e7 * 1e6 % 0xffffffff)
        u1 = uid & 0xffffffff00000000
        u2 = uid & 0xffffffff
        u3 = uid & 0xffffff
        u = u1 | u2 >> 24 | u3 << 8
        params.update({
            'u': str_or_none(u),
            'seqid': str_or_none(int_or_none(ct * 1000) + uid),
            'ver': '1',
            'uuid': int_or_none(ct % 1e7 * 1e6 % 0xffffffff),
            't': '100',
        })
        fm = compat_b64decode(params['fm']).decode().split('_', 1)[0]
        ss = hashlib.md5('|'.join([params['seqid'], params['ctype'], params['t']]))
        return fm, ss