Skip to content

Commit

Permalink
Use throwOnTimeout option for PQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
indutny-signal authored Nov 23, 2021
1 parent bd6ee4b commit 76a30a5
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 10 deletions.
6 changes: 5 additions & 1 deletion ts/ConversationController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,11 @@ export class ConversationController {
`ConversationController: Removing ${temporaryConversations.length} temporary conversations`
);
}
const queue = new PQueue({ concurrency: 3, timeout: 1000 * 60 * 2 });
const queue = new PQueue({
concurrency: 3,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
queue.addAll(
temporaryConversations.map(item => async () => {
await removeConversation(item.id, {
Expand Down
12 changes: 10 additions & 2 deletions ts/SignalProtocolStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,11 @@ export class SignalProtocolStore extends EventsMixin {
}

private _createSenderKeyQueue(): PQueue {
return new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
return new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
}

private _getSenderKeyQueue(senderId: QualifiedAddress): PQueue {
Expand Down Expand Up @@ -663,7 +667,11 @@ export class SignalProtocolStore extends EventsMixin {
}

private _createSessionQueue(): PQueue {
return new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
return new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
}

private _getSessionQueue(id: QualifiedAddress): PQueue {
Expand Down
6 changes: 5 additions & 1 deletion ts/components/emoji/lib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ const makeImagePath = (src: string) => {
return `${ROOT_PATH}node_modules/emoji-datasource-apple/img/apple/64/${src}`;
};

const imageQueue = new PQueue({ concurrency: 10, timeout: 1000 * 60 * 2 });
const imageQueue = new PQueue({
concurrency: 10,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
const images = new Set();

export const preloadImages = async (): Promise<void> => {
Expand Down
6 changes: 5 additions & 1 deletion ts/routineProfileRefresh.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ export async function routineProfileRefresh({
}
}

const refreshQueue = new PQueue({ concurrency: 5, timeout: 1000 * 60 * 2 });
const refreshQueue = new PQueue({
concurrency: 5,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
for (const conversation of conversationsToRefresh) {
refreshQueue.add(() => refreshConversation(conversation));
}
Expand Down
14 changes: 12 additions & 2 deletions ts/textsecure/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,27 @@ export default class MessageReceiver
}
this.serverTrustRoot = Bytes.fromBase64(serverTrustRoot);

this.incomingQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
this.appQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
this.incomingQueue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
this.appQueue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});

// All envelopes start in encryptedQueue and progress to decryptedQueue
this.encryptedQueue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
this.decryptedQueue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});

this.decryptAndCacheBatcher = createBatcher<CacheAddItemType>({
Expand Down
6 changes: 5 additions & 1 deletion ts/textsecure/WebAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2015,7 +2015,11 @@ export function initialize({
});

// Upload stickers
const queue = new PQueue({ concurrency: 3, timeout: 1000 * 60 * 2 });
const queue = new PQueue({
concurrency: 3,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
await Promise.all(
stickers.map(async (sticker: ServerAttachmentType, index: number) => {
const stickerParams = makePutParams(
Expand Down
6 changes: 5 additions & 1 deletion ts/util/batcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ export function createBatcher<ItemType>(
let batcher: BatcherType<ItemType>;
let timeout: NodeJS.Timeout | null;
let items: Array<ItemType> = [];
const queue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
const queue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});

function _kickBatchOff() {
if (timeout) {
Expand Down
1 change: 1 addition & 0 deletions ts/util/callingTones.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Sound } from './Sound';
const ringtoneEventQueue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});

class CallingTones {
Expand Down
1 change: 1 addition & 0 deletions ts/util/sendToGroup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ export async function _waitForAll<T>({
const queue = new PQueue({
concurrency: maxConcurrency,
timeout: 2 * 60 * 1000,
throwOnTimeout: true,
});
return queue.addAll(tasks);
}
Expand Down
6 changes: 5 additions & 1 deletion ts/util/waitBatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ export function createWaitBatcher<ItemType>(
let waitBatcher: BatcherType<ItemType>;
let timeout: NodeJS.Timeout | null;
let items: Array<ItemHolderType<ItemType>> = [];
const queue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
const queue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});

async function _kickBatchOff() {
const itemsRef = items;
Expand Down

0 comments on commit 76a30a5

Please sign in to comment.