forked from ccev/TrafficLight
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrafficlight.py
54 lines (37 loc) · 1.4 KB
/
trafficlight.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import asyncio
import json
from aiohttp import web
from pydantic import ValidationError
from trafficlight.config import config
from trafficlight.model import RequestModel
from trafficlight.output import get_output
from trafficlight.proto_utils import Proto
output = get_output(config.output)
class TrafficReceiver:
@staticmethod
async def process_data(data: RequestModel):
processed_protos = [Proto.from_raw(data.rpcid, p) for p in data.protos]
await output.add_record(rpc_id=data.rpcid, rpc_status=data.rpcstatus, protos=processed_protos)
@staticmethod
async def __traffic_post(request: web.Request):
try:
data = await request.json()
except json.JSONDecodeError:
return web.Response(status=400, text="bad json")
try:
model = RequestModel(**data)
except ValidationError as e:
return web.Response(status=400, text=f"malformed data: {e}")
await TrafficReceiver.process_data(model)
return web.Response(text="OK")
def get_app(self):
app = web.Application()
app.add_routes([web.post("/", self.__traffic_post)])
return app
t = TrafficReceiver()
server = t.get_app()
async def main():
await output.start()
asyncio.create_task(web._run_app(server, host=config.host, port=config.port, print=lambda _: _))
await asyncio.Event().wait()
asyncio.run(main())