forked from easychen/openai-api-proxy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fetchsse.js
64 lines (60 loc) · 1.78 KB
/
fetchsse.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
async function* streamAsyncIterable(stream) {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
return;
}
yield value;
}
} finally {
reader.releaseLock();
}
}
// add timeout to fetchSSE
async function fetchSSE(url, options, fetch2 = fetch) {
const { createParser } = await import("eventsource-parser");
const { onMessage, timeout , ...fetchOptions } = options;
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout||5000)
const res = await fetch2(url, {...fetchOptions,signal:controller.signal});
clearTimeout(timeoutId);
if (!res.ok) {
let reason;
try {
reason = await res.text();
} catch (err) {
reason = res.statusText;
}
const msg = `ChatGPT error ${res.status}: ${reason}`;
const error = new ChatGPTError(msg, { cause: res });
error.statusCode = res.status;
error.statusText = res.statusText;
error.context = { url, options };
throw error;
}
const parser = createParser((event) => {
if (event.type === "event") {
onMessage(event.data);
}
});
if (!res.body.getReader) {
const body = res.body;
if (!body.on || !body.read) {
throw new ChatGPTError('unsupported "fetch" implementation');
}
body.on("readable", () => {
let chunk;
while (null !== (chunk = body.read())) {
parser.feed(chunk.toString());
}
});
} else {
for await (const chunk of streamAsyncIterable(res.body)) {
const str = new TextDecoder().decode(chunk);
parser.feed(str);
}
}
}
module.exports = fetchSSE;