forked from langchain-ai/chat-langchain
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
112 lines (85 loc) · 2.65 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Main entrypoint for the app."""
import asyncio
from typing import Optional, Union
from uuid import UUID
import langsmith
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from langserve import add_routes
from langsmith import Client
from pydantic import BaseModel
from chain import ChatRequest, answer_chain
client = Client()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"],
)
add_routes(
app, answer_chain, path="/chat", input_type=ChatRequest, config_keys=["metadata"]
)
class SendFeedbackBody(BaseModel):
run_id: UUID
key: str = "user_score"
score: Union[float, int, bool, None] = None
feedback_id: Optional[UUID] = None
comment: Optional[str] = None
@app.post("/feedback")
async def send_feedback(body: SendFeedbackBody):
client.create_feedback(
body.run_id,
body.key,
score=body.score,
comment=body.comment,
feedback_id=body.feedback_id,
)
return {"result": "posted feedback successfully", "code": 200}
class UpdateFeedbackBody(BaseModel):
feedback_id: UUID
score: Union[float, int, bool, None] = None
comment: Optional[str] = None
@app.patch("/feedback")
async def update_feedback(body: UpdateFeedbackBody):
feedback_id = body.feedback_id
if feedback_id is None:
return {
"result": "No feedback ID provided",
"code": 400,
}
client.update_feedback(
feedback_id,
score=body.score,
comment=body.comment,
)
return {"result": "patched feedback successfully", "code": 200}
# TODO: Update when async API is available
async def _arun(func, *args, **kwargs):
return await asyncio.get_running_loop().run_in_executor(None, func, *args, **kwargs)
async def aget_trace_url(run_id: str) -> str:
for i in range(5):
try:
await _arun(client.read_run, run_id)
break
except langsmith.utils.LangSmithError:
await asyncio.sleep(1**i)
if await _arun(client.run_is_shared, run_id):
return await _arun(client.read_run_shared_link, run_id)
return await _arun(client.share_run, run_id)
class GetTraceBody(BaseModel):
run_id: UUID
@app.post("/get_trace")
async def get_trace(body: GetTraceBody):
run_id = body.run_id
if run_id is None:
return {
"result": "No LangSmith run ID provided",
"code": 400,
}
return await aget_trace_url(str(run_id))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)