Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] development from signalapp:development #2

Merged
merged 3 commits into from
Apr 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "Private messaging from your desktop",
"desktopName": "signal.desktop",
"repository": "https://github.com/signalapp/Signal-Desktop.git",
"version": "5.0.0-beta.3",
"version": "5.0.0-beta.4",
"license": "AGPL-3.0-only",
"author": {
"name": "Open Whisper Systems",
Expand Down
3 changes: 1 addition & 2 deletions ts/models/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3820,8 +3820,7 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
Math.min(readSync.get('read_at'), Date.now())
);
}
}
if (readSync || message.isExpirationTimerUpdate()) {

message.unset('unread');
// This is primarily to allow the conversation to mark all older
// messages as read, as is done when we receive a read sync for
Expand Down
175 changes: 95 additions & 80 deletions ts/services/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
import { ConversationModel } from '../models/conversations';
import { storageJobQueue } from '../util/JobQueue';
import { sleep } from '../util/sleep';
import { isMoreRecentThan } from '../util/timestamp';
import { isStorageWriteFeatureEnabled } from '../storage/isFeatureEnabled';

const {
Expand All @@ -41,7 +42,7 @@ const {

let consecutiveStops = 0;
let consecutiveConflicts = 0;
const forcedPushBucket: Array<number> = [];
const uploadBucket: Array<number> = [];

const validRecordTypes = new Set([
0, // UNKNOWN
Expand Down Expand Up @@ -125,6 +126,7 @@ type GeneratedManifestType = {

async function generateManifest(
version: number,
previousManifest?: ManifestRecordClass,
isNewManifest = false
): Promise<GeneratedManifestType> {
window.log.info(
Expand All @@ -138,6 +140,7 @@ async function generateManifest(
const ITEM_TYPE = window.textsecure.protobuf.ManifestRecord.Identifier.Type;

const conversationsToUpdate = [];
const insertKeys: Array<string> = [];
const deleteKeys: Array<ArrayBuffer> = [];
const manifestRecordKeys: Set<ManifestRecordIdentifierClass> = new Set();
const newItems: Set<StorageItemClass> = new Set();
Expand Down Expand Up @@ -206,6 +209,7 @@ async function generateManifest(
newItems.add(storageItem);

if (storageID) {
insertKeys.push(storageID);
window.log.info(
'storageService.generateManifest: new key',
conversation.idForLogging(),
Expand Down Expand Up @@ -343,6 +347,60 @@ async function generateManifest(

storageKeyDuplicates.clear();

// If we have a copy of what the current remote manifest is then we run these
// additional validations comparing our pending manifest to the remote
// manifest:
if (previousManifest) {
const pendingInserts: Set<string> = new Set();
const pendingDeletes: Set<string> = new Set();

const remoteKeys: Set<string> = new Set();
previousManifest.keys.forEach(
(identifier: ManifestRecordIdentifierClass) => {
const storageID = arrayBufferToBase64(identifier.raw.toArrayBuffer());
remoteKeys.add(storageID);
}
);

const localKeys: Set<string> = new Set();
manifestRecordKeys.forEach((identifier: ManifestRecordIdentifierClass) => {
const storageID = arrayBufferToBase64(identifier.raw.toArrayBuffer());
localKeys.add(storageID);

if (!remoteKeys.has(storageID)) {
pendingInserts.add(storageID);
}
});

remoteKeys.forEach(storageID => {
if (!localKeys.has(storageID)) {
pendingDeletes.add(storageID);
}
});

if (deleteKeys.length !== pendingDeletes.size) {
throw new Error('invalid write delete keys length do not match');
}
if (newItems.size !== pendingInserts.size) {
throw new Error('invalid write insert items length do not match');
}
deleteKeys.forEach(key => {
const storageID = arrayBufferToBase64(key);
if (!pendingDeletes.has(storageID)) {
throw new Error(
'invalid write delete key missing from pending deletes'
);
}
});
insertKeys.forEach(storageID => {
if (!pendingInserts.has(storageID)) {
throw new Error(
'invalid write insert key missing from pending inserts'
);
}
});
}

const manifestRecord = new window.textsecure.protobuf.ManifestRecord();
manifestRecord.version = version;
manifestRecord.keys = Array.from(manifestRecordKeys);
Expand Down Expand Up @@ -488,7 +546,7 @@ async function createNewManifest() {
conversationsToUpdate,
newItems,
storageManifest,
} = await generateManifest(version, true);
} = await generateManifest(version, undefined, true);

await uploadManifest(version, {
conversationsToUpdate,
Expand Down Expand Up @@ -714,17 +772,7 @@ async function processManifest(
});
});

// if the remote only keys are larger or equal to our local keys then it
// was likely a forced push of storage service. We keep track of these
// merges so that we can detect possible infinite loops
const isForcePushed = remoteOnlyRecords.size >= localKeys.size;

const conflictCount = await processRemoteRecords(
remoteOnlyRecords,
isForcePushed
);

let hasConflicts = conflictCount !== 0;
const conflictCount = await processRemoteRecords(remoteOnlyRecords);

// Post-merge, if our local records contain any storage IDs that were not
// present in the remote manifest then we'll need to clear it, generate a
Expand All @@ -739,21 +787,16 @@ async function processManifest(
redactStorageID(storageID),
conversation.idForLogging()
);
conversation.set({
needsStorageServiceSync: true,
storageID: undefined,
});
conversation.unset('storageID');
updateConversation(conversation.attributes);
hasConflicts = true;
}
});

return hasConflicts;
return conflictCount !== 0;
}

async function processRemoteRecords(
remoteOnlyRecords: Map<string, RemoteRecord>,
isForcePushed = false
remoteOnlyRecords: Map<string, RemoteRecord>
): Promise<number> {
const storageKeyBase64 = window.storage.get('storageKey');
const storageKey = base64ToArrayBuffer(storageKeyBase64);
Expand Down Expand Up @@ -916,50 +959,6 @@ async function processRemoteRecords(
// fresh.
window.storage.put('storage-service-error-records', newRecordsWithErrors);

const now = Date.now();

if (isForcePushed) {
window.log.info(
'storageService.processRemoteRecords: remote manifest was likely force pushed',
now
);
forcedPushBucket.push(now);

// we need to check our conversations because maybe all of them were not
// updated properly, for those that weren't we'll clear their storage
// key so that they can be included in the next update
window.getConversations().forEach((conversation: ConversationModel) => {
const storageID = conversation.get('storageID');
if (storageID && !remoteOnlyRecords.has(storageID)) {
window.log.info(
'storageService.processRemoteRecords: clearing storageID',
conversation.idForLogging()
);
conversation.unset('storageID');
}
});

if (forcedPushBucket.length >= 3) {
const [firstMostRecentForcedPush] = forcedPushBucket;

if (now - firstMostRecentForcedPush < 5 * MINUTE) {
window.log.info(
'storageService.processRemoteRecords: thrasing? Backing off'
);
const error = new Error();
error.code = 'E_BACKOFF';
throw error;
}

window.log.info(
'storageService.processRemoteRecords: thrash timestamp of first -> now',
firstMostRecentForcedPush,
now
);
forcedPushBucket.shift();
}
}

if (conflictCount !== 0) {
window.log.info(
'storageService.processRemoteRecords: ' +
Expand All @@ -980,13 +979,13 @@ async function processRemoteRecords(
return 0;
}

async function sync(): Promise<void> {
async function sync(): Promise<ManifestRecordClass | undefined> {
if (!isStorageWriteFeatureEnabled()) {
window.log.info(
'storageService.sync: Not starting desktop.storage is falsey'
);

return;
return undefined;
}

if (!window.storage.get('storageKey')) {
Expand All @@ -995,6 +994,7 @@ async function sync(): Promise<void> {

window.log.info('storageService.sync: starting...');

let manifest: ManifestRecordClass | undefined;
try {
// If we've previously interacted with strage service, update 'fetchComplete' record
const previousFetchComplete = window.storage.get('storageFetchComplete');
Expand All @@ -1004,12 +1004,12 @@ async function sync(): Promise<void> {
}

const localManifestVersion = manifestFromStorage || 0;
const manifest = await fetchManifest(localManifestVersion);
manifest = await fetchManifest(localManifestVersion);

// Guarding against no manifests being returned, everything should be ok
if (!manifest) {
window.log.info('storageService.sync: no new manifest');
return;
return undefined;
}

const version = manifest.version.toNumber();
Expand All @@ -1032,16 +1032,10 @@ async function sync(): Promise<void> {
'storageService.sync: error processing manifest',
err && err.stack ? err.stack : String(err)
);

// When we're told to backoff, backoff to the max which should be
// ~5 minutes. If this job was running inside a queue it'll probably time
// out.
if (err.code === 'E_BACKOFF') {
await backOff(9001);
}
}

window.log.info('storageService.sync: complete');
return manifest;
}

async function upload(fromSync = false): Promise<void> {
Expand All @@ -1057,6 +1051,22 @@ async function upload(fromSync = false): Promise<void> {
throw new Error('storageService.upload: We are offline!');
}

// Rate limit uploads coming from syncing
if (fromSync) {
uploadBucket.push(Date.now());
if (uploadBucket.length >= 3) {
const [firstMostRecentWrite] = uploadBucket;

if (isMoreRecentThan(5 * MINUTE, firstMostRecentWrite)) {
throw new Error(
'storageService.uploadManifest: too many writes too soon.'
);
}

uploadBucket.shift();
}
}

if (!window.storage.get('storageKey')) {
// requesting new keys runs the sync job which will detect the conflict
// and re-run the upload job once we're merged and up-to-date.
Expand All @@ -1068,11 +1078,12 @@ async function upload(fromSync = false): Promise<void> {
return;
}

let previousManifest: ManifestRecordClass | undefined;
if (!fromSync) {
// Syncing before we upload so that we repair any unknown records and
// records with errors as well as ensure that we have the latest up to date
// manifest.
await sync();
previousManifest = await sync();
}

const localManifestVersion = window.storage.get('manifestVersion') || 0;
Expand All @@ -1084,7 +1095,7 @@ async function upload(fromSync = false): Promise<void> {
);

try {
const generatedManifest = await generateManifest(version);
const generatedManifest = await generateManifest(version, previousManifest);
await uploadManifest(version, generatedManifest);
} catch (err) {
if (err.code === 409) {
Expand Down Expand Up @@ -1130,7 +1141,9 @@ export const storageServiceUploadJob = debounce(() => {
return;
}

storageJobQueue(upload, `upload v${window.storage.get('manifestVersion')}`);
storageJobQueue(async () => {
await upload();
}, `upload v${window.storage.get('manifestVersion')}`);
}, 500);

export const runStorageServiceSyncJob = debounce(() => {
Expand All @@ -1141,5 +1154,7 @@ export const runStorageServiceSyncJob = debounce(() => {
return;
}

storageJobQueue(sync, `sync v${window.storage.get('manifestVersion')}`);
storageJobQueue(async () => {
await sync();
}, `sync v${window.storage.get('manifestVersion')}`);
}, 500);
4 changes: 4 additions & 0 deletions ts/services/storageRecordOps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,10 @@ export async function mergeGroupV1Record(
conversation.idForLogging()
);
} else {
if (groupV1Record.id.byteLength !== 16) {
throw new Error('Not a valid gv1');
}

conversation = await window.ConversationController.getOrCreateAndWait(
groupId,
'group'
Expand Down