Skip to content

Commit

Permalink
retry deadlock_detected error when upserting records (NangoHQ#2067)
Browse files Browse the repository at this point in the history
## Describe your changes

Postgres doc advises to retry deadlock failures

https://www.postgresql.org/docs/current/mvcc-serialization-failure-handling.html

This commit also moves `retry` to `packages/utils`

Tested Locally
  • Loading branch information
TBonnin authored Apr 30, 2024
1 parent ed46305 commit 82b152e
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 18 deletions.
39 changes: 27 additions & 12 deletions packages/records/lib/models/records.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { removeDuplicateKey, getUniqueId } from '../helpers/uniqueKey.js';
import { logger } from '../utils/logger.js';
import { resultErr, resultOk, type Result } from '@nangohq/utils';
import type { Knex } from 'knex';
import { retry } from '@nangohq/utils';

dayjs.extend(utc);

Expand Down Expand Up @@ -197,18 +198,32 @@ export async function upsert(records: FormattedRecord[], connectionId: number, m

let summary: UpsertSummary = { addedKeys: [], updatedKeys: [], deletedKeys: [], nonUniqueKeys };
try {
await db.transaction(async (trx) => {
for (let i = 0; i < recordsWithoutDuplicates.length; i += BATCH_SIZE) {
const chunk = recordsWithoutDuplicates.slice(i, i + BATCH_SIZE);
const chunkSummary = await getUpsertSummary(chunk, connectionId, model, nonUniqueKeys, softDelete, trx);
summary = {
addedKeys: [...summary.addedKeys, ...chunkSummary.addedKeys],
updatedKeys: [...summary.updatedKeys, ...chunkSummary.updatedKeys],
deletedKeys: [...(summary.deletedKeys || []), ...(chunkSummary.deletedKeys || [])],
nonUniqueKeys: nonUniqueKeys
};
const encryptedRecords = encryptRecords(chunk);
await trx.from<FormattedRecord>(RECORDS_TABLE).insert(encryptedRecords).onConflict(['connection_id', 'external_id', 'model']).merge();
const upserting = async () =>
await db.transaction(async (trx) => {
for (let i = 0; i < recordsWithoutDuplicates.length; i += BATCH_SIZE) {
const chunk = recordsWithoutDuplicates.slice(i, i + BATCH_SIZE);
const chunkSummary = await getUpsertSummary(chunk, connectionId, model, nonUniqueKeys, softDelete, trx);
summary = {
addedKeys: [...summary.addedKeys, ...chunkSummary.addedKeys],
updatedKeys: [...summary.updatedKeys, ...chunkSummary.updatedKeys],
deletedKeys: [...(summary.deletedKeys || []), ...(chunkSummary.deletedKeys || [])],
nonUniqueKeys: nonUniqueKeys
};
const encryptedRecords = encryptRecords(chunk);
await trx.from<FormattedRecord>(RECORDS_TABLE).insert(encryptedRecords).onConflict(['connection_id', 'external_id', 'model']).merge();
}
});
// Retry upserting if deadlock detected
// https://www.postgresql.org/docs/current/mvcc-serialization-failure-handling.html
await retry(upserting, {
maxAttempts: 3,
delayMs: 500,
retryIf: (error) => {
if ('code' in error) {
const errorCode = (error as { code: string }).code;
return errorCode === '40P01'; // deadlock_detected }
}
return false;
}
});

Expand Down
5 changes: 3 additions & 2 deletions packages/shared/lib/db/database.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import knex from 'knex';
import type { Knex } from 'knex';
import { retry } from '../utils/retry.js';
import { retry } from '@nangohq/utils';

export function getDbConfig({ timeoutMs }: { timeoutMs: number }): Knex.Config<any> {
return {
Expand Down Expand Up @@ -34,7 +34,8 @@ export class KnexDatabase {
async migrate(directory: string): Promise<any> {
return retry(async () => await this.knex.migrate.latest({ directory: directory, tableName: '_nango_auth_migrations', schemaName: this.schema() }), {
maxAttempts: 4,
delayMs: (attempt) => 500 * attempt
delayMs: (attempt) => 500 * attempt,
retryIf: () => true
});
}

Expand Down
1 change: 1 addition & 0 deletions packages/utils/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from './id.js';
export * from './result.js';
export * from './encryption.js';
export * as metrics from './telemetry/metrics.js';
export * from './retry.js';
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
interface RetryConfig {
maxAttempts: number;
delayMs: number | ((attempt: number) => number);
retryIf: (error: Error) => boolean;
}

export async function retry<T>(fn: () => T, config: RetryConfig): Promise<T> {
const { maxAttempts, delayMs } = config;
const { maxAttempts, delayMs, retryIf } = config;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return fn();
} catch (error) {
if (attempt < maxAttempts) {
if (attempt < maxAttempts && retryIf(error as Error)) {
const delay = typeof delayMs === 'number' ? delayMs : delayMs(attempt);
await new Promise((resolve) => setTimeout(resolve, delay));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ describe('retry', () => {
},
{
maxAttempts: 3,
delayMs: () => 0
delayMs: () => 0,
retryIf: () => true
}
);
expect(result).toEqual(3);
Expand All @@ -30,12 +31,36 @@ describe('retry', () => {
},
{
maxAttempts: 3,
delayMs: () => 0
delayMs: () => 0,
retryIf: () => true
}
);
} catch (error: any) {
expect(error.message).toEqual('my error');
}
expect(count).toBe(3);
});

it('should not retry if error condition is false ', async () => {
let count = 0;
try {
await retry(
() => {
count++;
if (count < 3) {
throw new Error('my error');
}
return count;
},
{
maxAttempts: 3,
delayMs: () => 0,
retryIf: (error) => error.message === 'another error'
}
);
} catch (error: any) {
expect(error.message).toEqual('my error');
}
expect(count).toBe(1);
});
});

0 comments on commit 82b152e

Please sign in to comment.