-
Notifications
You must be signed in to change notification settings - Fork 1
/
huya.py
78 lines (63 loc) · 2.83 KB
/
huya.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import re
import aiohttp
from tars import tarscore
from tars.models import *
ayyuid_int = 0
class Huya:
wss_url = 'wss://cdnws.api.huya.com/'
heartbeat = b'\x00\x03\x1d\x00\x00\x69\x00\x00\x00\x69\x10\x03\x2c\x3c\x4c\x56\x08\x6f\x6e\x6c\x69\x6e\x65\x75' \
b'\x69\x66\x0f\x4f\x6e\x55\x73\x65\x72\x48\x65\x61\x72\x74\x42\x65\x61\x74\x7d\x00\x00\x3c\x08\x00' \
b'\x01\x06\x04\x74\x52\x65\x71\x1d\x00\x00\x2f\x0a\x0a\x0c\x16\x00\x26\x00\x36\x07\x61\x64\x72\x5f' \
b'\x77\x61\x70\x46\x00\x0b\x12\x03\xae\xf0\x0f\x22\x03\xae\xf0\x0f\x3c\x42\x6d\x52\x02\x60\x5c\x60' \
b'\x01\x7c\x82\x00\x0b\xb0\x1f\x9c\xac\x0b\x8c\x98\x0c\xa8\x0c '
def __init__(self):
super().__init__()
async def shake_hands(self, url):
reg_datas = []
url = 'https://m.huya.com/' + url.split('/')[-1]
headers = {
'user-agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, '
'like Gecko) Chrome/79.0.3945.88 Mobile Safari/537.36'}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
room_page = await resp.text()
m = re.search(r"lYyid\":([0-9]+)", room_page, re.MULTILINE)
ayyuid = m.group(1)
m = re.search(r"lChannelId\":([0-9]+)", room_page, re.MULTILINE)
tid = m.group(1)
m = re.search(r"lSubChannelId\":([0-9]+)", room_page, re.MULTILINE)
sid = m.group(1)
oos = tarscore.TarsOutputStream()
oos.write(tarscore.int64, 0, int(ayyuid))
oos.write(tarscore.boolean, 1, True) # Anonymous
oos.write(tarscore.string, 2, "") # sGuid
oos.write(tarscore.string, 3, "")
oos.write(tarscore.int64, 4, int(tid))
oos.write(tarscore.int64, 5, int(sid))
oos.write(tarscore.int64, 6, 0)
oos.write(tarscore.int64, 7, 0)
wscmd = tarscore.TarsOutputStream()
wscmd.write(tarscore.int32, 0, 1)
wscmd.write(tarscore.bytes, 1, oos.getBuffer())
reg_datas.append(wscmd.getBuffer())
return Huya.wss_url, reg_datas
@staticmethod
def decode(bis):
stream = tarscore.TarsInputStream(bis)
command = WebSocketCommand()
command.readFrom(stream)
return command
@staticmethod
def encode(message):
wup = Wup()
wup.set_Req(SendMessageReq(message, ayyuid_int))
command = WebSocketCommand()
command.iCmdType = 3
command.vData = wup.bin_buffer()
return command.bin_buffer()
@staticmethod
def login():
command = WebSocketCommand()
command.iCmdType = 10
command.vData = WSVerifyCookieReq().bin_buffer()
return command.bin_buffer()