Skip to content

Commit

Permalink
feat: Add queue in enc/dec group message (WhiskeySockets#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
BochilGaming authored Jul 8, 2023
1 parent a1fb826 commit e0e7d40
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 26 deletions.
62 changes: 38 additions & 24 deletions WASignalGroup/group_cipher.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const queue_job = require('./queue_job');
const SenderKeyMessage = require('./sender_key_message');
const crypto = require('libsignal/src/crypto');

Expand All @@ -7,11 +8,22 @@ class GroupCipher {
this.senderKeyName = senderKeyName;
}

queueJob(awaitable) {
return queue_job(this.senderKeyName.toString(), awaitable)
}

async encrypt(paddedPlaintext) {
try {
return await this.queueJob(async () => {
const record = await this.senderKeyStore.loadSenderKey(this.senderKeyName);
if (!record) {
throw new Error("No SenderKeyRecord found for encryption")
}
const senderKeyState = record.getSenderKeyState();
const senderKey = senderKeyState.getSenderChainKey().getSenderMessageKey();
if (!senderKeyState) {
throw new Error("No session to encrypt message");
}
const iteration = senderKeyState.getSenderChainKey().getIteration()
const senderKey = this.getSenderKey(senderKeyState, iteration === 0 ? 0 : iteration + 1)

const ciphertext = await this.getCipherText(
senderKey.getIv(),
Expand All @@ -25,35 +37,37 @@ class GroupCipher {
ciphertext,
senderKeyState.getSigningKeyPrivate()
);
senderKeyState.setSenderChainKey(senderKeyState.getSenderChainKey().getNext());
await this.senderKeyStore.storeSenderKey(this.senderKeyName, record);
return senderKeyMessage.serialize();
} catch (e) {
//console.log(e.stack);
throw new Error('NoSessionException');
}
return senderKeyMessage.serialize()
})
}

async decrypt(senderKeyMessageBytes) {
const record = await this.senderKeyStore.loadSenderKey(this.senderKeyName);
if (!record) throw new Error(`No sender key for: ${this.senderKeyName}`);

const senderKeyMessage = new SenderKeyMessage(null, null, null, null, senderKeyMessageBytes);
return await this.queueJob(async () => {
const record = await this.senderKeyStore.loadSenderKey(this.senderKeyName);
if (!record) {
throw new Error("No SenderKeyRecord found for decryption")
}
const senderKeyMessage = new SenderKeyMessage(null, null, null, null, senderKeyMessageBytes);
const senderKeyState = record.getSenderKeyState(senderKeyMessage.getKeyId());
if (!senderKeyState) {
throw new Error("No session found to decrypt message")
}

const senderKeyState = record.getSenderKeyState(senderKeyMessage.getKeyId());
senderKeyMessage.verifySignature(senderKeyState.getSigningKeyPublic());
const senderKey = this.getSenderKey(senderKeyState, senderKeyMessage.getIteration());
// senderKeyState.senderKeyStateStructure.senderSigningKey.private =
senderKeyMessage.verifySignature(senderKeyState.getSigningKeyPublic());
const senderKey = this.getSenderKey(senderKeyState, senderKeyMessage.getIteration());
// senderKeyState.senderKeyStateStructure.senderSigningKey.private =

const plaintext = await this.getPlainText(
senderKey.getIv(),
senderKey.getCipherKey(),
senderKeyMessage.getCipherText()
);
const plaintext = await this.getPlainText(
senderKey.getIv(),
senderKey.getCipherKey(),
senderKeyMessage.getCipherText()
);

await this.senderKeyStore.storeSenderKey(this.senderKeyName, record);
await this.senderKeyStore.storeSenderKey(this.senderKeyName, record);

return plaintext;
return plaintext;
})
}

getSenderKey(senderKeyState, iteration) {
Expand All @@ -67,7 +81,7 @@ class GroupCipher {
);
}

if (senderChainKey.getIteration() - iteration > 2000) {
if (iteration - senderChainKey.getIteration() > 2000) {
throw new Error('Over 2000 messages into the future!');
}

Expand Down
69 changes: 69 additions & 0 deletions WASignalGroup/queue_job.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// vim: ts=4:sw=4:expandtab

/*
* jobQueue manages multiple queues indexed by device to serialize
* session io ops on the database.
*/
'use strict';


const _queueAsyncBuckets = new Map();
const _gcLimit = 10000;

async function _asyncQueueExecutor(queue, cleanup) {
let offt = 0;
while (true) {
let limit = Math.min(queue.length, _gcLimit); // Break up thundering hurds for GC duty.
for (let i = offt; i < limit; i++) {
const job = queue[i];
try {
job.resolve(await job.awaitable());
} catch (e) {
job.reject(e);
}
}
if (limit < queue.length) {
/* Perform lazy GC of queue for faster iteration. */
if (limit >= _gcLimit) {
queue.splice(0, limit);
offt = 0;
} else {
offt = limit;
}
} else {
break;
}
}
cleanup();
}

module.exports = function (bucket, awaitable) {
/* Run the async awaitable only when all other async calls registered
* here have completed (or thrown). The bucket argument is a hashable
* key representing the task queue to use. */
if (!awaitable.name) {
// Make debuging easier by adding a name to this function.
Object.defineProperty(awaitable, 'name', { writable: true });
if (typeof bucket === 'string') {
awaitable.name = bucket;
} else {
console.warn("Unhandled bucket type (for naming):", typeof bucket, bucket);
}
}
let inactive;
if (!_queueAsyncBuckets.has(bucket)) {
_queueAsyncBuckets.set(bucket, []);
inactive = true;
}
const queue = _queueAsyncBuckets.get(bucket);
const job = new Promise((resolve, reject) => queue.push({
awaitable,
resolve,
reject
}));
if (inactive) {
/* An executor is not currently active; Start one now. */
_asyncQueueExecutor(queue, () => _queueAsyncBuckets.delete(bucket));
}
return job;
};
6 changes: 4 additions & 2 deletions WASignalGroup/sender_key_record.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ class SenderKeyRecord {
}

getSenderKeyState(keyId) {
if (!keyId && this.senderKeyStates.length) return this.senderKeyStates[0];
if (!keyId && this.senderKeyStates.length) return this.senderKeyStates[this.senderKeyStates.length - 1];
for (let i = 0; i < this.senderKeyStates.length; i++) {
const state = this.senderKeyStates[i];
if (state.getKeyId() === keyId) {
return state;
}
}
throw new Error(`No keys for: ${keyId}`);
}

addSenderKeyState(id, iteration, chainKey, signatureKey) {
this.senderKeyStates.push(new SenderKeyState(id, iteration, chainKey, null, signatureKey));
if (this.senderKeyStates.length > 5) {
this.senderKeyStates.shift()
}
}

setSenderKeyState(id, iteration, chainKey, keyPair) {
Expand Down

0 comments on commit e0e7d40

Please sign in to comment.