Skip to content

Commit

Permalink
Merge pull request #20 from lokwkin/feature/refactor-and-decoupling
Browse files Browse the repository at this point in the history
Feature/refactor and decoupling
  • Loading branch information
lokwkin authored Jan 7, 2023
2 parents b67891a + 4275f19 commit 065d495
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 86 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ It also incorporates ***queue mechanism*** with redis, so that it is more flexib

## Start Modes
This app has two modes to start:
1. `slackbot` - listens to slack event for user requests, put request to redis as queue
2. `chatgpt` - serves as queue worker that listens to queue, forward user's questions to chatgpt, and write to slack on answer.
1. `slackbot` - listens to slack event for user requests, put request to redis queue, reply to slack on answer received.
2. `chatgpt` - serves as queue worker that listens to queue, forward user's questions to chatgpt, and put response back to queue on answer received.

## Setup

Expand Down Expand Up @@ -37,12 +37,14 @@ docker run chatgpt_slackbot
|`SLACK_BOT_TOKEN`|Y|Your Slack Bot token. See https://api.slack.com/|
|`SLACK_APP_TOKEN`|Y|Your Slack App token. See https://api.slack.com/|
|`SLACK_BOT_USER_ID`|Y|The User ID of your Slack Bot. See https://api.slack.com/|
|`SLACK_REACTION_LOADING`|N|The emoji to react when loading a question, default `thinking_face`
|`SLACK_REACTION_SUCCESS`|Y|The emoji to react when the prompt is answered, default `white_trade_mark`
|`SLACK_REACTION_FAILED`|Y|The emoji to react when failed when processing, default `x`
|`CHATGPT_EMAIL`|Y|The email of your chatgpt account|
|`CHATGPT_PASSWORD`|Y|The password of your chatgpt account|
|`CHATGPT_PROXY_SERVER`|N|e.g.: 12.123.234.345:23456, leave it blank if not used|
|`CHATGPT_IS_GOOGLE_LOGIN`|N|1 or 0, default 0|
|`CHATGPT_REQUEST_TIMEOUT_MS`|N|Timeout value for chatgpt request. default 300000 (5min)|
|`QUEUE_INTERVAL_MS`|N|Interval between handling each queue item in ms. default 3000|

## Usage
- The slackbot will listen to two types of event in slack workspace
Expand Down
33 changes: 16 additions & 17 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,27 @@ async function main() {
if (!START_MODE_OPTIONS.includes(process.env.START_MODE)) {
throw new Error('Invalid start mode');
}
if (!process.env.SLACK_BOT_TOKEN || !process.env.SLACK_APP_TOKEN) {
throw new Error('Missing slack token');
};

RedisAgent.initialize({
redisUrl: process.env.REDIS_URL,
});

const slackBot = new ChatGtpSlackBot({
slackBotToken: process.env.SLACK_BOT_TOKEN,
slackAppToken: process.env.SLACK_APP_TOKEN,
});

if (process.env.START_MODE === 'slackbot') {

if (!process.env.SLACK_BOT_TOKEN || !process.env.SLACK_APP_TOKEN) {
throw new Error('Missing slack token');
};

const slackBot = new ChatGtpSlackBot({
slackBotToken: process.env.SLACK_BOT_TOKEN,
slackAppToken: process.env.SLACK_APP_TOKEN,
reactions: {
loading: process.env.SLACK_REACTION_LOADING,
success: process.env.SLACK_REACTION_SUCCESS,
failed: process.env.SLACK_REACTION_FAILED,
},
});

await slackBot.listen();

} else if (process.env.START_MODE === 'chatgpt') {
Expand All @@ -41,18 +48,10 @@ async function main() {
isGoogleLogin: Boolean(Number(process.env.CHATGPT_IS_GOOGLE_LOGIN)),
proxyServer: process.env.CHATGPT_PROXY_SERVER,
requestTimeoutMs: Number(process.env.CHATGPT_REQUEST_TIMEOUT_MS || 300000),
queueIntervalMs: Number(process.env.QUEUE_INTERVAL_MS || 3000),
});

chatGptClient.setCallbacks(async (answer, question, slackMeta, chatgptClientId) => {
await slackBot.replyAnswer(answer, question, slackMeta, chatgptClientId);
}, async (error, question, slackMeta, chatgptClientId) => {
await slackBot.replyError(error, question, slackMeta, chatgptClientId);
});

await chatGptClient.startChatGptSession();
await chatGptClient.startListenQueue();

await chatGptClient.listenQuestion();
}

}
Expand Down
111 changes: 61 additions & 50 deletions src/chatgpt-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import crypto from 'crypto';
* @property {boolean} [isGoogleLogin]
* @property {string|undefined} [proxyServer]
* @property {number} [requestTimeoutMs]
* @property {number} [queueIntervalMs]
*/

/**
* @typedef ChatGptQuestion
* @property {string} prompt
* @property {string} [conversationId]
* @property {string} [parentMessageId]
* @property {string} [responseQueue]
*/

/**
Expand All @@ -28,22 +28,19 @@ import crypto from 'crypto';
*/

/**
* @callback AnswerCallback
* @param {ChatGptAnswer} answer
* @param {ChatGptQuestion} question
* @param {SlackMeta} slackMeta
* @param {string} chatgptClientId
* @typedef ChatGptCallbackParam
* @property {boolean} success
* @property {string} handlerId
* @property {ChatGptQuestion} question
* @property {ChatGptAnswer} [answer]
* @property {Error} [error]
* @property {*} [extra]
*/

/**
* @callback ErrorCallback
* @param {Error} err
* @param {ChatGptQuestion} question
* @param {SlackMeta} slackMeta
* @param {string} chatgptClientId
* @callback ChatGptCallback
* @param {ChatGptCallbackParam} param
*/


class ChatGptClient {

/**
Expand All @@ -63,43 +60,30 @@ class ChatGptClient {
this.accEmail = args.accEmail;

/** @type {string} */
this.clientId = this._obtainClientId();
this.handlerId = this._obtainHandlerId();

/** @type {number} */
this.requestTimeoutMs = args.requestTimeoutMs ?? 5 * 60 * 1000;

/** @type {number} */
this.queueIntervalMs = args.queueIntervalMs ?? 3000;

/** @type {AnswerCallback} */
this.answerCallback = null;

/** @type {ErrorCallback} */
this.errorCallback = null;
}

/**
* @param {AnswerCallback} answerCallback
* @param {ErrorCallback} errorCallback
*/
setCallbacks(answerCallback, errorCallback) {
this.answerCallback = answerCallback;
this.errorCallback = errorCallback;
}

/**
* Ask ChatGPT asyncrhonously
* @param {ChatGptQuestion} question
* @param {SlackMeta} slackMeta
* @param {string} [chatgptClientId]
* Ask ChatGPT Asyncrhonously. Requests will be enqueued to a queue system for handlers to process.
* The result will be returned through an answer queue provided by caller.
* @param {ChatGptQuestion} question Question
* @param {object} opts
* @param {string} opts.responseQueue The name of the queue that the answer should be enqueued to.
* @param {string} [opts.handlerId] In case a specific handler should be used to answer the question. Mostly used in case of follow up questions.
* @param {*} [opts.extra] Any extra information that will returned along with the answer.
* @return {Promise<ChatResponse>}
*/
static async ask(question, slackMeta, chatgptClientId = undefined) {
static async ask(question, opts) {

const { responseQueue, handlerId, extra } = opts;

if (chatgptClientId) {
await RedisAgent.getInstance().enqueue(`ChatGptClient.${chatgptClientId}`, { question, slackMeta });
if (handlerId) {
await RedisAgent.getInstance().enqueue(`queues.questions.handler.${handlerId}`, { question, extra, responseQueue });
} else {
await RedisAgent.getInstance().enqueue(`ChatGptClient.COMMON`, { question, slackMeta });
await RedisAgent.getInstance().enqueue(`queues.questions.handler.common`, { question, extra, responseQueue });
}
}

Expand All @@ -108,7 +92,7 @@ class ChatGptClient {
* @returns {Promise<void>}
*/
async startChatGptSession() {
console.info(`[${new Date().toISOString()}] CHATGPT_CONNECTING_SESSION`);
console.info(`[${new Date().toISOString()}] CHATGPT_CONNECTING_SESSION <${this.accEmail}>`);
try {
await this.chatApi.initSession();
} catch (err) {
Expand All @@ -122,27 +106,54 @@ class ChatGptClient {
* The account specific queue is used in case that a root question is processed by a specific account previously
* therefore its follow-up must also be processed by the same account.
*/
async startListenQueue() {
console.info(`[${new Date().toISOString()}] CHATGPT_START_LISTEN_QUEUE`);
async listenQuestion() {
console.info(`[${new Date().toISOString()}] CHATGPT_START_LISTEN_QUEUE <${this.accEmail}>`);
while (true) {
await this._popAndHandle(`ChatGptClient.COMMON`);
await this._popAndHandle(`ChatGptClient.${this.clientId}`);
await new Promise(r => setTimeout(r, this.queueIntervalMs));
await this._popAndHandle(`queues.questions.handler.common`);
await this._popAndHandle(`queues.questions.handler.${this.handlerId}`);
await new Promise(r => setTimeout(r, 1000));
}
}

/**
* @param {string} answerQueueName
* @param {ChatGptCallback} callback
*/
static async listenAnswer(answerQueueName, callback) {
while (true) {
/** @type {ChatGptCallbackParam} */
let item = await RedisAgent.getInstance().dequeue(answerQueueName);
if (item) {
await callback(item);
}
await new Promise(r => setTimeout(r, 1000));
}
}

/**
* Pops one item from queue and try to handle it
* Pops one item from queue and try to handle it.
* @param {string} queueName
*/
async _popAndHandle(queueName) {
let item = await RedisAgent.getInstance().dequeue(queueName);
if (item) {
try {
const answer = await this._handleAsk(item.question, true);
await this.answerCallback(answer, item.question, item.slackMeta, this.clientId);
await RedisAgent.getInstance().enqueue(item.responseQueue, {
success: true,
answer,
question: item.question,
extra: item.extra,
handlerId: this.handlerId
});
} catch (err) {
await this.errorCallback(err, item.question, item.slackMeta, this.clientId);
await RedisAgent.getInstance().enqueue(item.responseQueue, {
success: false,
error: err,
question: item.question,
extra: item.extra,
handlerId: this.handlerId
});
}
}
}
Expand Down Expand Up @@ -182,7 +193,7 @@ class ChatGptClient {
/**
* Returns a hash string from account email
*/
_obtainClientId() {
_obtainHandlerId() {
const hash = crypto.createHash('sha256');
hash.update(this.accEmail);
const hashedEmail = hash.digest('hex');
Expand Down
Loading

0 comments on commit 065d495

Please sign in to comment.