-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
57 lines (43 loc) · 1.71 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import time
from can.interface import Bus
from pymitter import EventEmitter
from config.config import conf
from models.debug_messages import init_debug
from models.message_queue_events import MessageQueueEvents
from robot.robot import Robot
from utils.colors import bcolors, colorit
from utils.helper import proper_exit
from webserver.webserver import WebServer
_bustype = "virtual" if conf["DEBUG_VIRTUAL"] or conf["DEBUG_VCAN"] else "socketcan"
_channel = conf["VCHANNEL"] if conf["DEBUG_VCAN"] else conf["CHANNEL"]
bus = Bus(channel=_channel, bustype=_bustype, bitrate=conf["CAN_BAUD"])
events = EventEmitter()
robot = Robot(bus, events)
try:
webServer = WebServer(host="0.0.0.0", port=5000, global_events=events, robot=robot)
webServer.start()
if conf["DEBUG_VIRTUAL"]:
time.sleep(5) # wait that the web clients connection
if conf["DEBUG_CAN"] or conf["DEBUG_VIRTUAL"] or conf["DEBUG_VCAN"]:
v_bus = init_debug(bus, _bustype, _channel, events)
while True:
message = bus.recv() # potential bottle-neck
if message is not None:
events.emit(MessageQueueEvents.NEW_CAN_PACKET.value, message)
if conf["DEBUG_MESSAGES"]:
print(colorit(str(message), bcolors.OKCYAN))
if conf["DEBUG_CYCLE"]:
events.emit(MessageQueueEvents.END_CYCLE.value)
time.sleep(1)
except KeyboardInterrupt:
print(colorit("KeyboardInterrupt", bcolors.FAIL))
except Exception as e:
print(colorit(str(e), bcolors.FAIL))
finally:
bus.shutdown()
if (
conf["DEBUG_VIRTUAL"]
and v_bus is not None # pylint: disable=used-before-assignment
):
v_bus.shutdown() # pylint: disable=used-before-assignment
proper_exit()