forked from wzpan/wukong-robot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wukong.py
163 lines (137 loc) · 5.41 KB
/
wukong.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# -*- coding: utf-8-*-
from snowboy import snowboydecoder
from robot import config, utils, constants, logging, statistic, Player
from robot.Updater import Updater
from robot.ConfigMonitor import ConfigMonitor
from robot.Conversation import Conversation
from server import server
from watchdog.observers import Observer
import sys
import os
import signal
import hashlib
import fire
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger(__name__)
class Wukong(object):
_profiling = False
_dev = False
def init(self):
global conversation
self.detector = None
self._interrupted = False
print('''
********************************************************
* wukong-robot - 中文语音对话机器人 *
* (c) 2019 潘伟洲 <[email protected]> *
* https://github.com/wzpan/wukong-robot.git *
********************************************************
如需退出,可以按 Ctrl-4 组合键。
''')
config.init()
self._conversation = Conversation(self._profiling)
self._conversation.say('{} 你好!试试对我喊唤醒词叫醒我吧'.format(config.get('first_name', '主人')), True)
self._observer = Observer()
event_handler = ConfigMonitor(self._conversation)
self._observer.schedule(event_handler, constants.CONFIG_PATH, False)
self._observer.schedule(event_handler, constants.DATA_PATH, False)
self._observer.start()
def _signal_handler(self, signal, frame):
self._interrupted = True
utils.clean()
self._observer.stop()
def _detected_callback(self):
if not utils.is_proper_time():
logger.warning('勿扰模式开启中')
return
if self._conversation.isRecording:
logger.warning('正在录音中,跳过')
return
Player.play(constants.getData('beep_hi.wav'))
logger.info('开始录音')
self._conversation.interrupt()
self._conversation.isRecording = True;
def _do_not_bother_on_callback(self):
if config.get('/do_not_bother/hotword_switch', False):
utils.do_not_bother = True
Player.play(constants.getData('off.wav'))
logger.info('勿扰模式打开')
def _do_not_bother_off_callback(self):
if config.get('/do_not_bother/hotword_switch', False):
utils.do_not_bother = False
Player.play(constants.getData('on.wav'))
logger.info('勿扰模式关闭')
def _interrupt_callback(self):
return self._interrupted
def run(self):
self.init()
# capture SIGINT signal, e.g., Ctrl+C
signal.signal(signal.SIGINT, self._signal_handler)
# site
server.run(self._conversation, self)
statistic.report(0)
try:
self.initDetector()
except AttributeError:
logger.error('初始化离线唤醒功能失败')
pass
def initDetector(self):
if self.detector is not None:
self.detector.terminate()
if config.get('/do_not_bother/hotword_switch', False):
models = [
constants.getHotwordModel(config.get('hotword', 'wukong.pmdl')),
constants.getHotwordModel(utils.get_do_not_bother_on_hotword()),
constants.getHotwordModel(utils.get_do_not_bother_off_hotword())
]
else:
models = constants.getHotwordModel(config.get('hotword', 'wukong.pmdl'))
self.detector = snowboydecoder.HotwordDetector(models, sensitivity=config.get('sensitivity', 0.5))
# main loop
try:
if config.get('/do_not_bother/hotword_switch', False):
callbacks = [self._detected_callback,
self._do_not_bother_on_callback,
self._do_not_bother_off_callback]
else:
callbacks = self._detected_callback
self.detector.start(detected_callback=callbacks,
audio_recorder_callback=self._conversation.converse,
interrupt_check=self._interrupt_callback,
silent_count_threshold=config.get('silent_threshold', 15),
recording_timeout=config.get('recording_timeout', 5) * 4,
sleep_time=0.03)
self.detector.terminate()
except Exception as e:
logger.critical('离线唤醒机制初始化失败:{}'.format(e))
def md5(self, password):
return hashlib.md5(password.encode('utf-8')).hexdigest()
def update(self):
updater = Updater()
return updater.update()
def fetch(self):
updater = Updater()
updater.fetch()
def restart(self):
logger.critical('程序重启...')
try:
self.detector.terminate()
except AttributeError:
pass
python = sys.executable
os.execl(python, python, * sys.argv)
def profiling(self):
logger.info('性能调优')
self._profiling = True
self.run()
def dev(self):
logger.info('使用测试环境')
self._dev = True
self.run()
if __name__ == '__main__':
if len(sys.argv) == 1:
wukong = Wukong()
wukong.run()
else:
fire.Fire(Wukong)