Skip to content

Commit

Permalink
Flush all watchers on empty queue
Browse files Browse the repository at this point in the history
  • Loading branch information
indutny-signal authored Mar 26, 2021
1 parent 67892d8 commit 746e99b
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 85 deletions.
121 changes: 59 additions & 62 deletions ts/background.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export async function startApp(): Promise<void> {
});
window.Whisper.deliveryReceiptQueue.pause();
window.Whisper.deliveryReceiptBatcher = window.Signal.Util.createBatcher({
name: 'Whisper.deliveryReceiptBatcher',
wait: 500,
maxSize: 500,
processBatch: async (items: WhatIsThis) => {
Expand Down Expand Up @@ -2056,7 +2057,7 @@ export async function startApp(): Promise<void> {
async function onEmpty() {
await Promise.all([
window.waitForAllBatchers(),
window.waitForAllWaitBatchers(),
window.flushAllWaitBatchers(),
]);
window.log.info('onEmpty: All outstanding database requests complete');
initialLoadComplete = true;
Expand All @@ -2074,72 +2075,68 @@ export async function startApp(): Promise<void> {
logger: window.log,
});

let interval: NodeJS.Timer | null = setInterval(async () => {
const view = window.owsDesktopApp.appView;
if (view) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
clearInterval(interval!);
interval = null;
view.onEmpty();
window.Whisper.deliveryReceiptQueue.start();
window.Whisper.Notifications.enable();

window.logAppLoadedEvent();
if (messageReceiver) {
window.log.info(
'App loaded - messages:',
messageReceiver.getProcessedCount()
);
}
const view = window.owsDesktopApp.appView;
if (!view) {
throw new Error('Expected `appView` to be initialized');
}

window.sqlInitializer.goBackToMainProcess();
window.Signal.Util.setBatchingStrategy(false);

const attachmentDownloadQueue = window.attachmentDownloadQueue || [];

// NOTE: ts/models/messages.ts expects this global to become undefined
// once we stop processing the queue.
window.attachmentDownloadQueue = undefined;

const MAX_ATTACHMENT_MSGS_TO_DOWNLOAD = 250;
const attachmentsToDownload = attachmentDownloadQueue.filter(
(message, index) =>
index <= MAX_ATTACHMENT_MSGS_TO_DOWNLOAD ||
isMoreRecentThan(
message.getReceivedAt(),
MAX_ATTACHMENT_DOWNLOAD_AGE
) ||
// Stickers and long text attachments has to be downloaded for UI
// to display the message properly.
message.hasRequiredAttachmentDownloads()
);
window.log.info(
'Downloading recent attachments of total attachments',
attachmentsToDownload.length,
attachmentDownloadQueue.length
);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
view.onEmpty();

if (window.startupProcessingQueue) {
window.startupProcessingQueue.flush();
window.startupProcessingQueue = undefined;
}
window.logAppLoadedEvent();
if (messageReceiver) {
window.log.info(
'App loaded - messages:',
messageReceiver.getProcessedCount()
);
}

const messagesWithDownloads = await Promise.all(
attachmentsToDownload.map(message =>
message.queueAttachmentDownloads()
)
);
const messagesToSave: Array<MessageAttributesType> = [];
messagesWithDownloads.forEach((shouldSave, messageKey) => {
if (shouldSave) {
const message = attachmentsToDownload[messageKey];
messagesToSave.push(message.attributes);
}
});
await window.Signal.Data.saveMessages(messagesToSave, {});
}
}, 500);
window.sqlInitializer.goBackToMainProcess();
window.Signal.Util.setBatchingStrategy(false);

window.Whisper.deliveryReceiptQueue.start();
window.Whisper.Notifications.enable();
const attachmentDownloadQueue = window.attachmentDownloadQueue || [];

// NOTE: ts/models/messages.ts expects this global to become undefined
// once we stop processing the queue.
window.attachmentDownloadQueue = undefined;

const MAX_ATTACHMENT_MSGS_TO_DOWNLOAD = 250;
const attachmentsToDownload = attachmentDownloadQueue.filter(
(message, index) =>
index <= MAX_ATTACHMENT_MSGS_TO_DOWNLOAD ||
isMoreRecentThan(
message.getReceivedAt(),
MAX_ATTACHMENT_DOWNLOAD_AGE
) ||
// Stickers and long text attachments has to be downloaded for UI
// to display the message properly.
message.hasRequiredAttachmentDownloads()
);
window.log.info(
'Downloading recent attachments of total attachments',
attachmentsToDownload.length,
attachmentDownloadQueue.length
);

if (window.startupProcessingQueue) {
window.startupProcessingQueue.flush();
window.startupProcessingQueue = undefined;
}

const messagesWithDownloads = await Promise.all(
attachmentsToDownload.map(message => message.queueAttachmentDownloads())
);
const messagesToSave: Array<MessageAttributesType> = [];
messagesWithDownloads.forEach((shouldSave, messageKey) => {
if (shouldSave) {
const message = attachmentsToDownload[messageKey];
messagesToSave.push(message.attributes);
}
});
await window.Signal.Data.saveMessages(messagesToSave, {});
}
function onReconnect() {
// We disable notifications on first connect, but the same applies to reconnect. In
Expand Down
1 change: 1 addition & 0 deletions ts/sql/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ async function getConversationById(
}

const updateConversationBatcher = createBatcher<ConversationType>({
name: 'sql.Client.updateConversationBatcher',
wait: 500,
maxSize: 20,
processBatch: async (items: Array<ConversationType>) => {
Expand Down
91 changes: 91 additions & 0 deletions ts/test-both/util/batcher_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only

import { assert } from 'chai';
import * as sinon from 'sinon';

import { createBatcher } from '../../util/batcher';
import { sleep } from '../../util/sleep';

describe('batcher', () => {
it('should schedule a full batch', async () => {
const processBatch = sinon.fake.resolves(undefined);

const batcher = createBatcher<number>({
name: 'test',
wait: 10,
maxSize: 2,
processBatch,
});

batcher.add(1);
batcher.add(2);

assert.ok(processBatch.calledOnceWith([1, 2]), 'Full batch on first call');
});

it('should schedule a partial batch', async () => {
const processBatch = sinon.fake.resolves(undefined);

const batcher = createBatcher<number>({
name: 'test',
wait: 5,
maxSize: 2,
processBatch,
});

batcher.add(1);

await sleep(10);

assert.ok(processBatch.calledOnceWith([1]), 'Partial batch after timeout');
});

it('should flushAndWait a partial batch', async () => {
const processBatch = sinon.fake.resolves(undefined);

const batcher = createBatcher<number>({
name: 'test',
wait: 10000,
maxSize: 1000,
processBatch,
});

batcher.add(1);

await batcher.flushAndWait();

assert.ok(
processBatch.calledOnceWith([1]),
'Partial batch after flushAndWait'
);
});

it('should flushAndWait a partial batch with new items added', async () => {
let calledTimes = 0;
const processBatch = async (batch: Array<number>): Promise<void> => {
calledTimes += 1;
if (calledTimes === 1) {
assert.deepEqual(batch, [1], 'First partial batch');
batcher.add(2);
} else if (calledTimes === 2) {
assert.deepEqual(batch, [2], 'Second partial batch');
} else {
assert.strictEqual(calledTimes, 2);
}
};

const batcher = createBatcher<number>({
name: 'test',
wait: 10000,
maxSize: 1000,
processBatch,
});

batcher.add(1);

await batcher.flushAndWait();

assert.strictEqual(calledTimes, 2);
});
});
80 changes: 80 additions & 0 deletions ts/test-both/util/waitBatcher_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only

import { assert } from 'chai';
import * as sinon from 'sinon';

import { createWaitBatcher } from '../../util/waitBatcher';

describe('waitBatcher', () => {
it('should schedule a full batch', async () => {
const processBatch = sinon.fake.resolves(undefined);

const batcher = createWaitBatcher<number>({
name: 'test',
wait: 10,
maxSize: 2,
processBatch,
});

await Promise.all([batcher.add(1), batcher.add(2)]);

assert.ok(processBatch.calledOnceWith([1, 2]), 'Full batch on first call');
});

it('should schedule a partial batch', async () => {
const processBatch = sinon.fake.resolves(undefined);

const batcher = createWaitBatcher<number>({
name: 'test',
wait: 10,
maxSize: 2,
processBatch,
});

await batcher.add(1);

assert.ok(processBatch.calledOnceWith([1]), 'Partial batch on timeout');
});

it('should flush a partial batch', async () => {
const processBatch = sinon.fake.resolves(undefined);

const batcher = createWaitBatcher<number>({
name: 'test',
wait: 10000,
maxSize: 1000,
processBatch,
});

await Promise.all([batcher.add(1), batcher.flushAndWait()]);

assert.ok(
processBatch.calledOnceWith([1]),
'Partial batch on flushAndWait'
);
});

it('should flush a partial batch with new items added', async () => {
const processBatch = sinon.fake.resolves(undefined);

const batcher = createWaitBatcher<number>({
name: 'test',
wait: 10000,
maxSize: 1000,
processBatch,
});

await Promise.all([
(async () => {
await batcher.add(1);
await batcher.add(2);
})(),
batcher.flushAndWait(),
]);

assert(processBatch.firstCall.calledWith([1]), 'First partial batch');
assert(processBatch.secondCall.calledWith([2]), 'Second partial batch');
assert(!processBatch.thirdCall);
});
});
17 changes: 15 additions & 2 deletions ts/textsecure/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,23 @@ class MessageReceiverInner extends EventTarget {
this.appQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });

this.cacheAddBatcher = createBatcher<CacheAddItemType>({
name: 'MessageReceiver.cacheAddBatcher',
wait: 200,
maxSize: 30,
processBatch: this.cacheAndQueueBatch.bind(this),
processBatch: (items: Array<CacheAddItemType>) => {
// Not returning the promise here because we don't want to stall
// the batch.
this.cacheAndQueueBatch(items);
},
});
this.cacheUpdateBatcher = createBatcher<CacheUpdateItemType>({
name: 'MessageReceiver.cacheUpdateBatcher',
wait: 500,
maxSize: 30,
processBatch: this.cacheUpdateBatch.bind(this),
});
this.cacheRemoveBatcher = createBatcher<string>({
name: 'MessageReceiver.cacheRemoveBatcher',
wait: 500,
maxSize: 30,
processBatch: this.cacheRemoveBatch.bind(this),
Expand Down Expand Up @@ -507,7 +514,13 @@ class MessageReceiverInner extends EventTarget {
}

onEmpty() {
const emitEmpty = () => {
const emitEmpty = async () => {
await Promise.all([
this.cacheAddBatcher.flushAndWait(),
this.cacheUpdateBatcher.flushAndWait(),
this.cacheRemoveBatcher.flushAndWait(),
]);

window.log.info("MessageReceiver: emitting 'empty' event");
const ev = new Event('empty');
this.dispatchEvent(ev);
Expand Down
Loading

0 comments on commit 746e99b

Please sign in to comment.