forked from xszyou/Fay
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathali_nls.py
191 lines (171 loc) · 6.67 KB
/
ali_nls.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
from threading import Thread
from threading import Lock
import websocket
import json
import time
import ssl
import wave
import _thread as thread
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
from core import wsa_server
from scheduler.thread_manager import MyThread
from utils import util
from utils import config_util as cfg
from core.authorize_tb import Authorize_Tb
__running = True
__my_thread = None
_token = ''
def __post_token():
global _token
__client = AcsClient(
cfg.key_ali_nls_key_id,
cfg.key_ali_nls_key_secret,
"cn-shanghai"
)
__request = CommonRequest()
__request.set_method('POST')
__request.set_domain('nls-meta.cn-shanghai.aliyuncs.com')
__request.set_version('2019-02-28')
__request.set_action_name('CreateToken')
info = json.loads(__client.do_action_with_exception(__request))
_token = info['Token']['Id']
authorize = Authorize_Tb()
authorize_info = authorize.find_by_userid(cfg.key_ali_nls_key_id)
if authorize_info is not None:
authorize.update_by_userid(cfg.key_ali_nls_key_id, _token, info['Token']['ExpireTime']*1000)
else:
authorize.add(cfg.key_ali_nls_key_id, _token, info['Token']['ExpireTime']*1000)
def __runnable():
while __running:
__post_token()
time.sleep(60 * 60 * 12)
def start():
MyThread(target=__runnable).start()
class ALiNls:
# 初始化
def __init__(self, username):
self.__URL = 'wss://nls-gateway-cn-shenzhen.aliyuncs.com/ws/v1'
self.__ws = None
self.__frames = []
self.started = False
self.__closing = False
self.__task_id = ''
self.done = False
self.finalResults = ""
self.username = username
self.data = b''
self.__endding = False
self.__is_close = False
self.lock = Lock()
def __create_header(self, name):
if name == 'StartTranscription':
self.__task_id = util.random_hex(32)
header = {
"appkey": cfg.key_ali_nls_app_key,
"message_id": util.random_hex(32),
"task_id": self.__task_id,
"namespace": "SpeechTranscriber",
"name": name
}
return header
# 收到websocket消息的处理
def on_message(self, ws, message):
try:
data = json.loads(message)
header = data['header']
name = header['name']
if name == 'TranscriptionStarted':
self.started = True
if name == 'SentenceEnd':
self.done = True
self.finalResults = data['payload']['result']
if wsa_server.get_web_instance().is_connected(self.username):
wsa_server.get_web_instance().add_cmd({"panelMsg": self.finalResults, "Username" : self.username})
if wsa_server.get_instance().is_connected(self.username):
content = {'Topic': 'Unreal', 'Data': {'Key': 'log', 'Value': self.finalResults}, 'Username' : self.username}
wsa_server.get_instance().add_cmd(content)
ws.close()#TODO
elif name == 'TranscriptionResultChanged':
self.finalResults = data['payload']['result']
if wsa_server.get_web_instance().is_connected(self.username):
wsa_server.get_web_instance().add_cmd({"panelMsg": self.finalResults, "Username" : self.username})
if wsa_server.get_instance().is_connected(self.username):
content = {'Topic': 'Unreal', 'Data': {'Key': 'log', 'Value': self.finalResults}, 'Username' : self.username}
wsa_server.get_instance().add_cmd(content)
except Exception as e:
print(e)
# print("### message:", message)
# 收到websocket的关闭要求
def on_close(self, ws, code, msg):
self.__endding = True
self.__is_close = True
if msg:
print("aliyun asr服务不太稳定:", msg)
# 收到websocket错误的处理
def on_error(self, ws, error):
print("aliyun asr error:", error)
# 收到websocket连接建立的处理
def on_open(self, ws):
self.__endding = False
#为了兼容多路asr,关闭过程数据
def run(*args):
while self.__endding == False:
try:
if len(self.__frames) > 0:
with self.lock:
frame = self.__frames.pop(0)
if isinstance(frame, dict):
ws.send(json.dumps(frame))
elif isinstance(frame, bytes):
ws.send(frame, websocket.ABNF.OPCODE_BINARY)
self.data += frame
else:
time.sleep(0.001) # 避免忙等
except Exception as e:
print(e)
break
if self.__is_close == False:
for frame in self.__frames:
ws.send(frame, websocket.ABNF.OPCODE_BINARY)
frame = {"header": self.__create_header('StopTranscription')}
ws.send(json.dumps(frame))
thread.start_new_thread(run, ())
def __connect(self):
self.finalResults = ""
self.done = False
with self.lock:
self.__frames.clear()
self.__ws = websocket.WebSocketApp(self.__URL + '?token=' + _token, on_message=self.on_message)
self.__ws.on_open = self.on_open
self.__ws.on_error = self.on_error
self.__ws.on_close = self.on_close
self.__ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
def send(self, buf):
with self.lock:
self.__frames.append(buf)
def start(self):
Thread(target=self.__connect, args=[]).start()
data = {
'header': self.__create_header('StartTranscription'),
"payload": {
"format": "pcm",
"sample_rate": 16000,
"enable_intermediate_result": True,
"enable_punctuation_prediction": False,
"enable_inverse_text_normalization": True,
"speech_noise_threshold": -1
}
}
self.send(data)
def end(self):
self.__endding = True
with wave.open('cache_data/input2.wav', 'wb') as wf:
# 设置音频参数
n_channels = 1 # 单声道
sampwidth = 2 # 16 位音频,每个采样点 2 字节
wf.setnchannels(n_channels)
wf.setsampwidth(sampwidth)
wf.setframerate(16000)
wf.writeframes(self.data)
self.data = b''