Skip to content

Commit

Permalink
Retry delivery and read receipts for up to 24 hours
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanHahn-Signal authored Dec 7, 2021
1 parent e81821f commit f9e9883
Show file tree
Hide file tree
Showing 15 changed files with 316 additions and 243 deletions.
57 changes: 6 additions & 51 deletions ts/background.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import type {
} from './model-types.d';
import * as Bytes from './Bytes';
import * as Timers from './Timers';
import type { WhatIsThis, DeliveryReceiptBatcherItemType } from './window.d';
import type { WhatIsThis } from './window.d';
import type { Receipt } from './types/Receipt';
import { getTitleBarVisibility, TitleBarVisibility } from './types/Settings';
import { SocketStatus } from './types/SocketStatus';
import { DEFAULT_CONVERSATION_COLOR } from './types/Colors';
Expand Down Expand Up @@ -131,6 +132,7 @@ import { ToastConversationArchived } from './components/ToastConversationArchive
import { ToastConversationUnarchived } from './components/ToastConversationUnarchived';
import { showToast } from './util/showToast';
import { startInteractionMode } from './windows/startInteractionMode';
import { deliveryReceiptsJobQueue } from './jobs/deliveryReceiptsJobQueue';

const MAX_ATTACHMENT_DOWNLOAD_AGE = 3600 * 72 * 1000;

Expand Down Expand Up @@ -378,59 +380,12 @@ export async function startApp(): Promise<void> {
});
window.Whisper.deliveryReceiptQueue.pause();
window.Whisper.deliveryReceiptBatcher =
window.Signal.Util.createBatcher<DeliveryReceiptBatcherItemType>({
window.Signal.Util.createBatcher<Receipt>({
name: 'Whisper.deliveryReceiptBatcher',
wait: 500,
maxSize: 100,
processBatch: async items => {
const byConversationId = window._.groupBy(items, item =>
window.ConversationController.ensureContactIds({
e164: item.source,
uuid: item.sourceUuid,
})
);
const ids = Object.keys(byConversationId);

for (let i = 0, max = ids.length; i < max; i += 1) {
const conversationId = ids[i];
const ourItems = byConversationId[conversationId];
const timestamps = ourItems.map(item => item.timestamp);
const messageIds = ourItems.map(item => item.messageId);

const c = window.ConversationController.get(conversationId);
if (!c) {
log.warn(
`deliveryReceiptBatcher: Conversation ${conversationId} does not exist! ` +
`Will not send delivery receipts for timestamps ${timestamps}`
);
continue;
}

const senderUuid = c.get('uuid');
const senderE164 = c.get('e164');

c.queueJob('sendDeliveryReceipt', async () => {
try {
const sendOptions = await getSendOptions(c.attributes);

// eslint-disable-next-line no-await-in-loop
await handleMessageSend(
window.textsecure.messaging.sendDeliveryReceipt({
senderE164,
senderUuid,
timestamps,
options: sendOptions,
}),
{ messageIds, sendType: 'deliveryReceipt' }
);
} catch (error) {
log.error(
`Failed to send delivery receipt to ${senderE164}/${senderUuid} for timestamps ${timestamps}:`,
error && error.stack ? error.stack : error
);
}
});
}
processBatch: async deliveryReceipts => {
await deliveryReceiptsJobQueue.add({ deliveryReceipts });
},
});

Expand Down
45 changes: 45 additions & 0 deletions ts/jobs/deliveryReceiptsJobQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only

import { z } from 'zod';
import type { LoggerType } from '../types/Logging';
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
import { receiptSchema, ReceiptType } from '../types/Receipt';
import { MAX_RETRY_TIME, runReceiptJob } from './helpers/receiptHelpers';

import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';

const deliveryReceiptsJobDataSchema = z.object({
deliveryReceipts: receiptSchema.array(),
});

type DeliveryReceiptsJobData = z.infer<typeof deliveryReceiptsJobDataSchema>;

export class DeliveryReceiptsJobQueue extends JobQueue<DeliveryReceiptsJobData> {
protected parseData(data: unknown): DeliveryReceiptsJobData {
return deliveryReceiptsJobDataSchema.parse(data);
}

protected async run(
{
data,
timestamp,
}: Readonly<{ data: DeliveryReceiptsJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
await runReceiptJob({
attempt,
log,
timestamp,
receipts: data.deliveryReceipts,
type: ReceiptType.Delivery,
});
}
}

export const deliveryReceiptsJobQueue = new DeliveryReceiptsJobQueue({
store: jobQueueDatabaseStore,
queueType: 'delivery receipts',
maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME),
});
42 changes: 42 additions & 0 deletions ts/jobs/helpers/receiptHelpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only

import * as durations from '../../util/durations';
import type { LoggerType } from '../../types/Logging';
import type { Receipt, ReceiptType } from '../../types/Receipt';
import { sendReceipts } from '../../util/sendReceipts';
import { commonShouldJobContinue } from './commonShouldJobContinue';
import { handleCommonJobRequestError } from './handleCommonJobRequestError';

export const MAX_RETRY_TIME = durations.DAY;

export async function runReceiptJob({
attempt,
log,
timestamp,
receipts,
type,
}: Readonly<{
attempt: number;
log: LoggerType;
receipts: ReadonlyArray<Receipt>;
timestamp: number;
type: ReceiptType;
}>): Promise<void> {
const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();

const shouldContinue = await commonShouldJobContinue({
attempt,
log,
timeRemaining,
});
if (!shouldContinue) {
return;
}

try {
await sendReceipts({ log, receipts, type });
} catch (err: unknown) {
await handleCommonJobRequestError({ err, log, timeRemaining });
}
}
4 changes: 4 additions & 0 deletions ts/jobs/initializeAllJobQueues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

import type { WebAPIType } from '../textsecure/WebAPI';

import { deliveryReceiptsJobQueue } from './deliveryReceiptsJobQueue';
import { normalMessageSendJobQueue } from './normalMessageSendJobQueue';
import { reactionJobQueue } from './reactionJobQueue';
import { readReceiptsJobQueue } from './readReceiptsJobQueue';
import { readSyncJobQueue } from './readSyncJobQueue';
import { removeStorageKeyJobQueue } from './removeStorageKeyJobQueue';
import { reportSpamJobQueue } from './reportSpamJobQueue';
Expand All @@ -21,8 +23,10 @@ export function initializeAllJobQueues({
}): void {
reportSpamJobQueue.initialize({ server });

deliveryReceiptsJobQueue.streamJobs();
normalMessageSendJobQueue.streamJobs();
reactionJobQueue.streamJobs();
readReceiptsJobQueue.streamJobs();
readSyncJobQueue.streamJobs();
removeStorageKeyJobQueue.streamJobs();
reportSpamJobQueue.streamJobs();
Expand Down
56 changes: 56 additions & 0 deletions ts/jobs/readReceiptsJobQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only

import { z } from 'zod';
import type { LoggerType } from '../types/Logging';
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
import type { StorageInterface } from '../types/Storage.d';
import type { Receipt } from '../types/Receipt';
import { receiptSchema, ReceiptType } from '../types/Receipt';
import { MAX_RETRY_TIME, runReceiptJob } from './helpers/receiptHelpers';

import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';

const readReceiptsJobDataSchema = z.object({
readReceipts: receiptSchema.array(),
});

type ReadReceiptsJobData = z.infer<typeof readReceiptsJobDataSchema>;

export class ReadReceiptsJobQueue extends JobQueue<ReadReceiptsJobData> {
public async addIfAllowedByUser(
storage: Pick<StorageInterface, 'get'>,
readReceipts: Array<Receipt>
): Promise<void> {
if (storage.get('read-receipt-setting')) {
await this.add({ readReceipts });
}
}

protected parseData(data: unknown): ReadReceiptsJobData {
return readReceiptsJobDataSchema.parse(data);
}

protected async run(
{
data,
timestamp,
}: Readonly<{ data: ReadReceiptsJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
await runReceiptJob({
attempt,
log,
timestamp,
receipts: data.readReceipts,
type: ReceiptType.Read,
});
}
}

export const readReceiptsJobQueue = new ReadReceiptsJobQueue({
store: jobQueueDatabaseStore,
queueType: 'read receipts',
maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME),
});
34 changes: 7 additions & 27 deletions ts/jobs/viewedReceiptsJobQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,15 @@
// SPDX-License-Identifier: AGPL-3.0-only

import { z } from 'zod';
import * as durations from '../util/durations';
import type { LoggerType } from '../types/Logging';
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
import { commonShouldJobContinue } from './helpers/commonShouldJobContinue';
import { sendViewedReceipt } from '../util/sendViewedReceipt';
import { receiptSchema, ReceiptType } from '../types/Receipt';
import { MAX_RETRY_TIME, runReceiptJob } from './helpers/receiptHelpers';

import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
import { handleCommonJobRequestError } from './helpers/handleCommonJobRequestError';

const MAX_RETRY_TIME = durations.DAY;

const viewedReceiptsJobDataSchema = z.object({
viewedReceipt: z.object({
messageId: z.string(),
senderE164: z.string().optional(),
senderUuid: z.string().optional(),
timestamp: z.number(),
}),
});
const viewedReceiptsJobDataSchema = z.object({ viewedReceipt: receiptSchema });

type ViewedReceiptsJobData = z.infer<typeof viewedReceiptsJobDataSchema>;

Expand All @@ -37,22 +26,13 @@ export class ViewedReceiptsJobQueue extends JobQueue<ViewedReceiptsJobData> {
}: Readonly<{ data: ViewedReceiptsJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();

const shouldContinue = await commonShouldJobContinue({
await runReceiptJob({
attempt,
log,
timeRemaining,
timestamp,
receipts: [data.viewedReceipt],
type: ReceiptType.Viewed,
});
if (!shouldContinue) {
return;
}

try {
await sendViewedReceipt(data.viewedReceipt, log);
} catch (err: unknown) {
await handleCommonJobRequestError({ err, log, timeRemaining });
}
}
}

Expand Down
23 changes: 10 additions & 13 deletions ts/models/conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import { isConversationAccepted } from '../util/isConversationAccepted';
import { markConversationRead } from '../util/markConversationRead';
import { handleMessageSend } from '../util/handleMessageSend';
import { getConversationMembers } from '../util/getConversationMembers';
import { sendReadReceiptsFor } from '../util/sendReadReceiptsFor';
import { updateConversationsWithUuidLookup } from '../updateConversationsWithUuidLookup';
import { ReadStatus } from '../messages/MessageReadStatus';
import { SendStatus } from '../messages/MessageSendState';
Expand Down Expand Up @@ -92,6 +91,7 @@ import {
getMessagePropStatus,
} from '../state/selectors/message';
import { normalMessageSendJobQueue } from '../jobs/normalMessageSendJobQueue';
import { readReceiptsJobQueue } from '../jobs/readReceiptsJobQueue';
import { Deletes } from '../messageModifiers/Deletes';
import type { ReactionModel } from '../messageModifiers/Reactions';
import { isAnnouncementGroupReady } from '../util/isAnnouncementGroupReady';
Expand Down Expand Up @@ -1976,21 +1976,18 @@ export class ConversationModel extends window.Backbone
const readMessages = messages.filter(
m => !hasErrors(m.attributes) && isIncoming(m.attributes)
);
const receiptSpecs = readMessages.map(m => ({
messageId: m.id,
senderE164: m.get('source'),
senderUuid: m.get('sourceUuid'),
senderId: window.ConversationController.ensureContactIds({
e164: m.get('source'),
uuid: m.get('sourceUuid'),
}),
timestamp: m.get('sent_at'),
hasErrors: hasErrors(m.attributes),
}));

if (isLocalAction) {
// eslint-disable-next-line no-await-in-loop
await sendReadReceiptsFor(this.attributes, receiptSpecs);
await readReceiptsJobQueue.addIfAllowedByUser(
window.storage,
readMessages.map(m => ({
messageId: m.id,
senderE164: m.get('source'),
senderUuid: m.get('sourceUuid'),
timestamp: m.get('sent_at'),
}))
);
}

// eslint-disable-next-line no-await-in-loop
Expand Down
4 changes: 2 additions & 2 deletions ts/models/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2524,8 +2524,8 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
window.Whisper.deliveryReceiptQueue.add(() => {
window.Whisper.deliveryReceiptBatcher.add({
messageId,
source,
sourceUuid,
senderE164: source,
senderUuid: sourceUuid,
timestamp: this.get('sent_at'),
});
});
Expand Down
4 changes: 2 additions & 2 deletions ts/textsecure/SendMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1584,7 +1584,7 @@ export default class MessageSender {
});
}

async sendReadReceipts(
async sendReadReceipt(
options: Readonly<{
senderE164?: string;
senderUuid?: string;
Expand All @@ -1598,7 +1598,7 @@ export default class MessageSender {
});
}

async sendViewedReceipts(
async sendViewedReceipt(
options: Readonly<{
senderE164?: string;
senderUuid?: string;
Expand Down
Loading

0 comments on commit f9e9883

Please sign in to comment.