Skip to content

Commit

Permalink
feat: implement ability to abort request in clients
Browse files Browse the repository at this point in the history
  • Loading branch information
waylaidwanderer committed Feb 28, 2023
1 parent 2ad7386 commit c95681b
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 27 deletions.
8 changes: 8 additions & 0 deletions bin/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ await server.register(cors, {

server.post('/conversation', async (request, reply) => {
const body = request.body || {};
const abortController = new AbortController();

reply.raw.on('close', () => {
if (abortController.signal.aborted === false) {
abortController.abort();
}
});

let onProgress;
if (body.stream === true) {
Expand Down Expand Up @@ -110,6 +117,7 @@ server.post('/conversation', async (request, reply) => {
clientId: body.clientId,
invocationId: body.invocationId,
onProgress,
abortController,
});
} catch (e) {
error = e;
Expand Down
13 changes: 12 additions & 1 deletion src/BingAIClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ export default class BingAIClient {
clientId,
invocationId = 0,
onProgress,
abortController = new AbortController(),
} = opts;

if (typeof onProgress !== 'function') {
Expand All @@ -142,6 +143,7 @@ export default class BingAIClient {
}

const ws = await this.createWebSocketConnection();

const obj = {
arguments: [
{
Expand Down Expand Up @@ -175,10 +177,19 @@ export default class BingAIClient {

const messagePromise = new Promise((resolve, reject) => {
let replySoFar = '';

const messageTimeout = setTimeout(() => {
this.cleanupWebSocketConnection(ws);
reject(new Error('Timed out waiting for response. Try enabling debug mode to see more information.'))
}, 120 * 1000,);
}, 120 * 1000);

// abort the request if the abort controller is aborted
abortController.signal.addEventListener('abort', () => {
clearTimeout(messageTimeout);
this.cleanupWebSocketConnection(ws);
reject('Request aborted');
});

ws.on('message', (data) => {
const objects = data.toString().split('');
const events = objects.map((object) => {
Expand Down
14 changes: 9 additions & 5 deletions src/ChatGPTBrowserClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ export default class ChatGPTBrowserClient {
this.conversationsCache = new Keyv(cacheOptions);
}

async postConversation(conversation, onProgress) {
async postConversation(conversation, onProgress, abortController = null) {
const {
action = 'next',
conversationId,
parentMessageId = crypto.randomUUID(),
message,
} = conversation;

if (!abortController) {
abortController = new AbortController();
}

const debug = this.options.debug;
const url = this.options.reverseProxyUrl || 'https://chat.openai.com/backend-api/conversation';
const opts = {
Expand Down Expand Up @@ -60,12 +64,11 @@ export default class ChatGPTBrowserClient {
// data: {"message": {"id": "UUID", "role": "assistant", "user": null, "create_time": null, "update_time": null, "content": {"content_type": "text", "parts": ["That's alright! If you don't have a specific question or topic in mind, I can suggest some general conversation starters or topics to explore. \n\nFor example, we could talk about your interests, hobbies, or goals. Alternatively, we could discuss current events, pop culture, or science and technology. Is there anything in particular that you're curious about or would like to learn more about?"]}, "end_turn": true, "weight": 1.0, "metadata": {"message_type": "next", "model_slug": "text-davinci-002-render-sha", "finish_details": {"type": "stop", "stop": "<|im_end|>"}}, "recipient": "all"}, "conversation_id": "UUID", "error": null}
return new Promise(async (resolve, reject) => {
let lastEvent = null;
const controller = new AbortController();
try {
let done = false;
await fetchEventSource(url, {
...opts,
signal: controller.signal,
signal: abortController.signal,
async onopen(response) {
if (response.status === 200) {
return;
Expand Down Expand Up @@ -94,7 +97,7 @@ export default class ChatGPTBrowserClient {
return;
}
onProgress('[DONE]');
controller.abort();
abortController.abort();
resolve(lastEvent);
}
},
Expand All @@ -114,7 +117,7 @@ export default class ChatGPTBrowserClient {
}
if (message.data === '[DONE]') {
onProgress('[DONE]');
controller.abort();
abortController.abort();
resolve(lastEvent);
done = true;
return;
Expand Down Expand Up @@ -177,6 +180,7 @@ export default class ChatGPTBrowserClient {
message,
},
opts.onProgress || (() => {}),
opts.abortController || new AbortController(),
);

if (this.options.debug) {
Expand Down
58 changes: 37 additions & 21 deletions src/ChatGPTClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ export default class ChatGPTClient {
this.conversationsCache = new Keyv(cacheOptions);
}

async getCompletion(prompt, onProgress) {
async getCompletion(prompt, onProgress, abortController = null) {
if (!abortController) {
abortController = new AbortController();
}
const modelOptions = { ...this.modelOptions };
if (typeof onProgress === 'function') {
modelOptions.stream = true;
Expand Down Expand Up @@ -93,12 +96,11 @@ export default class ChatGPTClient {
};
if (modelOptions.stream) {
return new Promise(async (resolve, reject) => {
const controller = new AbortController();
try {
let done = false;
await fetchEventSource(url, {
...opts,
signal: controller.signal,
signal: abortController.signal,
async onopen(response) {
if (response.status === 200) {
return;
Expand All @@ -124,7 +126,7 @@ export default class ChatGPTClient {
// workaround for private API not sending [DONE] event
if (!done) {
onProgress('[DONE]');
controller.abort();
abortController.abort();
resolve();
}
},
Expand All @@ -144,7 +146,7 @@ export default class ChatGPTClient {
}
if (message.data === '[DONE]') {
onProgress('[DONE]');
controller.abort();
abortController.abort();
resolve();
done = true;
return;
Expand All @@ -157,7 +159,13 @@ export default class ChatGPTClient {
}
});
}
const response = await fetch(url, opts);
const response = await fetch(
url,
{
...opts,
signal: abortController.signal,
},
);
if (response.status !== 200) {
const body = await response.text();
const error = new Error(`Failed to send message. HTTP ${response.status} - ${body}`);
Expand Down Expand Up @@ -201,22 +209,30 @@ export default class ChatGPTClient {
let reply = '';
let result = null;
if (typeof opts.onProgress === 'function') {
await this.getCompletion(prompt, (message) => {
if (message === '[DONE]') {
return;
}
const token = message.choices[0].text;
if (this.options.debug) {
console.debug(token);
}
if (token === this.endToken) {
return;
}
opts.onProgress(token);
reply += token;
});
await this.getCompletion(
prompt,
(message) => {
if (message === '[DONE]') {
return;
}
const token = message.choices[0].text;
if (this.options.debug) {
console.debug(token);
}
if (token === this.endToken) {
return;
}
opts.onProgress(token);
reply += token;
},
opts.abortController || new AbortController(),
);
} else {
result = await this.getCompletion(prompt, null);
result = await this.getCompletion(
prompt,
null,
opts.abortController || new AbortController(),
);
if (this.options.debug) {
console.debug(JSON.stringify(result));
}
Expand Down

0 comments on commit c95681b

Please sign in to comment.