-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy path__init__.py
43 lines (29 loc) · 1004 Bytes
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from contextlib import asynccontextmanager
from broadcaster import Broadcast
from fastapi import FastAPI
from project.config import settings
broadcast = Broadcast(settings.WS_MESSAGE_QUEUE)
@asynccontextmanager
async def lifespan(app: FastAPI):
await broadcast.connect()
yield
await broadcast.disconnect()
def create_app() -> FastAPI:
app = FastAPI(lifespan=lifespan)
from project.logging import configure_logging
configure_logging()
# do this before loading routes
from project.celery_utils import create_celery
app.celery_app = create_celery()
from project.users import users_router
app.include_router(users_router)
from project.tdd import tdd_router
app.include_router(tdd_router)
from project.ws import ws_router
app.include_router(ws_router)
from project.ws.views import register_socketio_app
register_socketio_app(app)
@app.get("/")
async def root():
return {"message": "Hello World"}
return app