Skip to content

Commit

Permalink
Write records to new database (#2000)
Browse files Browse the repository at this point in the history
This PR implements the double write to the new records database. No
reading from this db is done yet.
  • Loading branch information
TBonnin authored Apr 18, 2024
1 parent 7d875dc commit 6ecb055
Show file tree
Hide file tree
Showing 31 changed files with 237 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration-flows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
run: |
# Build, install CLI and verify it can run
npm ci
npm run build -w @nangohq/node -w @nangohq/utils -w @nangohq/shared -w nango
npm run build -w @nangohq/node -w @nangohq/utils -w @nangohq/records -w @nangohq/shared -w nango
npm install -g ./packages/cli
NANGO_CLI_UPGRADE_MODE=ignore nango version --debug
Expand Down
9 changes: 4 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions packages/cli/lib/services/dryrun.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { parseSecretKey, printDebug, hostport, getConnection, getConfig } from '
import configService from './config.service.js';
import compileService from './compile.service.js';
import integrationService from './local-integration.service.js';
import type { RecordsServiceInterface } from '@nangohq/shared/lib/services/sync/run.service.js';

interface RunArgs extends GlobalOptions {
sync: string;
Expand Down Expand Up @@ -148,8 +149,23 @@ class DryRunService {
messages: []
};

// dry-run mode does not read or write to the records database
// so we can safely mock the records service
const recordsService: RecordsServiceInterface = {
markNonCurrentGenerationRecordsAsDeleted: (
_connectionId: number,
_model: string,
_syncId: string,
_generation: number
// eslint-disable-next-line @typescript-eslint/require-await
): Promise<string[]> => {
return Promise.resolve([]);
}
};

const syncRun = new syncRunService({
integrationService,
recordsService,
writeToDb: false,
nangoConnection,
provider,
Expand Down
1 change: 1 addition & 0 deletions packages/jobs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ WORKDIR /nango
COPY packages/node-client/ packages/node-client/
COPY packages/shared/ packages/shared/
COPY packages/utils/ packages/utils/
COPY packages/records/ packages/records/
COPY packages/data-ingestion/ packages/data-ingestion/
COPY packages/jobs/ packages/jobs/
COPY packages/runner/ packages/runner/
Expand Down
5 changes: 5 additions & 0 deletions packages/jobs/lib/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
getSyncByIdAndName,
getLastSyncDate
} from '@nangohq/shared';
import { records as recordsService } from '@nangohq/records';
import { getLogger, env } from '@nangohq/utils';
import { BigQueryClient } from '@nangohq/data-ingestion/dist/index.js';
import integrationService from './integration.service.js';
Expand Down Expand Up @@ -61,6 +62,7 @@ export async function runAction(args: ActionArgs): Promise<ServiceResponse> {
const syncRun = new syncRunService({
bigQueryClient,
integrationService,
recordsService,
writeToDb: true,
nangoConnection,
syncName: actionName,
Expand Down Expand Up @@ -234,6 +236,7 @@ export async function syncProvider(
const syncRun = new syncRunService({
bigQueryClient,
integrationService,
recordsService,
writeToDb: true,
syncId,
syncJobId,
Expand Down Expand Up @@ -322,6 +325,7 @@ export async function runWebhook(args: WebhookArgs): Promise<boolean> {
const syncRun = new syncRunService({
bigQueryClient,
integrationService,
recordsService,
writeToDb: true,
nangoConnection,
syncJobId: syncJobId?.id as number,
Expand Down Expand Up @@ -415,6 +419,7 @@ export async function cancelActivity(workflowArguments: InitialSyncArgs | Contin
const syncRun = new syncRunService({
bigQueryClient,
integrationService,
recordsService,
writeToDb: true,
syncId,
nangoConnection,
Expand Down
4 changes: 3 additions & 1 deletion packages/jobs/lib/crons/autoIdleDemo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
} from '@nangohq/shared';
import { getLogger, isErr } from '@nangohq/utils';
import tracer from 'dd-trace';
import { records as recordsService } from '@nangohq/records';

const logger = getLogger('Jobs');

Expand Down Expand Up @@ -73,7 +74,8 @@ export async function exec(): Promise<void> {
environmentId: sync.environment_id,
providerConfigKey: sync.unique_key,
connectionId: sync.connection_id,
syncName: sync.name
syncName: sync.name,
recordsService
});
if (isErr(resTemporal)) {
continue;
Expand Down
2 changes: 2 additions & 0 deletions packages/jobs/lib/crons/deleteSyncsData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
db,
findRecentlyDeletedSync
} from '@nangohq/shared';
import { records } from '@nangohq/records';
import { getLogger } from '@nangohq/utils';
import tracer from 'dd-trace';

Expand Down Expand Up @@ -73,6 +74,7 @@ export async function exec(): Promise<void> {
// ----
// hard delete records
const res = await syncDataService.deleteRecordsBySyncId({ syncId: sync.id, limit: limitRecords });
await records.deleteRecordsBySyncId({ syncId: sync.id, limit: limitRecords });
telemetry.increment(MetricTypes.JOBS_DELETE_SYNCS_DATA_RECORDS, res.totalDeletedRecords);
}
});
Expand Down
2 changes: 1 addition & 1 deletion packages/jobs/nodemon.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"watch": ["lib", "../shared/dist", "../utils/dist", "../data-ingestion/dist", "../../.env"],
"watch": ["lib", "../shared/dist", "../utils/dist", "../records/dist", "../data-ingestion/dist", "../../.env"],
"ext": "js,ts,json",
"ignore": ["lib/**/*.spec.ts"],
"exec": "tsx -r dotenv/config lib/app.ts dotenv_config_path=./../../.env",
Expand Down
1 change: 1 addition & 0 deletions packages/jobs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"@nangohq/nango-runner": "^1.0.0",
"@nangohq/shared": "^0.39.18",
"@nangohq/utils": "file:../utils",
"@nangohq/records": "file:../records",
"@temporalio/activity": "^1.9.1",
"@temporalio/client": "^1.9.1",
"@temporalio/worker": "^1.9.1",
Expand Down
3 changes: 3 additions & 0 deletions packages/jobs/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
{
"path": "../utils"
},
{
"path": "../records"
},
{
"path": "../data-ingestion"
}
Expand Down
75 changes: 64 additions & 11 deletions packages/persist/lib/controllers/persist.controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { NextFunction, Request, Response } from 'express';
import type { LogLevel, DataResponse, DataRecord, UpsertResponse } from '@nangohq/shared';
import { records as recordsService, format as recordsFormatter, type FormattedRecord, type UnencryptedRecordData } from '@nangohq/records';
import {
createActivityLogMessage,
errorManager,
Expand All @@ -14,7 +15,9 @@ import {
} from '@nangohq/shared';
import tracer from 'dd-trace';
import type { Span } from 'dd-trace';
import { resultErr, resultOk, isOk, type Result } from '@nangohq/utils';
import { getLogger, resultErr, resultOk, isOk, isErr, type Result } from '@nangohq/utils';

const logger = getLogger('PersistController');

type persistType = 'save' | 'delete' | 'update';
type RecordRequest = Request<
Expand Down Expand Up @@ -68,8 +71,18 @@ class PersistController {
params: { environmentId, nangoConnectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, connectionId, activityLogId }
} = req;
const persist = async (dataRecords: DataRecord[]) => {
return await dataService.upsert(dataRecords, nangoConnectionId, model, activityLogId, environmentId, false);
const persist = async (records: FormattedRecord[], legacyRecords: DataRecord[]) => {
recordsService
.upsert(records, nangoConnectionId, model, false)
.then((res) => {
if (isErr(res)) {
throw res.err;
}
})
.catch((reason) => {
logger.error(`Failed to save records: ${reason}`);
});
return await dataService.upsert(legacyRecords, nangoConnectionId, model, activityLogId, environmentId, false);
};
const result = await PersistController.persistRecords({
persistType: 'save',
Expand Down Expand Up @@ -97,8 +110,18 @@ class PersistController {
params: { environmentId, nangoConnectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, connectionId, activityLogId }
} = req;
const persist = async (dataRecords: DataRecord[]) => {
return await dataService.upsert(dataRecords, nangoConnectionId, model, activityLogId, environmentId, true);
const persist = async (records: FormattedRecord[], legacyRecords: DataRecord[]) => {
recordsService
.upsert(records, nangoConnectionId, model, true)
.then((res) => {
if (isErr(res)) {
throw res.err;
}
})
.catch((reason) => {
logger.error(`Failed to delete records: ${reason}`);
});
return await dataService.upsert(legacyRecords, nangoConnectionId, model, activityLogId, environmentId, true);
};
const result = await PersistController.persistRecords({
persistType: 'delete',
Expand Down Expand Up @@ -126,8 +149,18 @@ class PersistController {
params: { environmentId, nangoConnectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, connectionId, activityLogId }
} = req;
const persist = async (dataRecords: DataRecord[]) => {
return await dataService.update(dataRecords, nangoConnectionId, model, activityLogId, environmentId);
const persist = async (records: FormattedRecord[], legacyRecords: DataRecord[]) => {
recordsService
.update(records, nangoConnectionId, model)
.then((res) => {
if (isErr(res)) {
throw res.err;
}
})
.catch((reason) => {
logger.error(`Failed to update records: ${reason}`);
});
return await dataService.update(legacyRecords, nangoConnectionId, model, activityLogId, environmentId);
};
const result = await PersistController.persistRecords({
persistType: 'update',
Expand Down Expand Up @@ -175,7 +208,7 @@ class PersistController {
records: Record<string, any>[];
activityLogId: number;
softDelete: boolean;
persistFunction: (records: DataRecord[]) => Promise<UpsertResponse>;
persistFunction: (records: FormattedRecord[], legacyRecords: DataRecord[]) => Promise<UpsertResponse>;
}): Promise<Result<void>> {
const active = tracer.scope().active();
const recordsSizeInBytes = Buffer.byteLength(JSON.stringify(records), 'utf8');
Expand All @@ -196,13 +229,21 @@ class PersistController {
}
});

let formattedRecords: FormattedRecord[] = [];
const formatting = recordsFormatter.formatRecords(records as UnencryptedRecordData[], nangoConnectionId, model, syncId, syncJobId, softDelete);
if (isErr(formatting)) {
logger.error('Failed to format records: ' + formatting.err.message);
} else {
formattedRecords = formatting.res;
}

const {
success,
error,
response: formattedRecords
response: legacyFormattedRecords
} = syncDataService.formatDataRecords(records as unknown as DataResponse[], nangoConnectionId, model, syncId, syncJobId, softDelete);

if (!success || formattedRecords === null) {
if (!success || legacyFormattedRecords === null) {
await createActivityLogMessage({
level: 'error',
environment_id: environmentId,
Expand All @@ -222,7 +263,19 @@ class PersistController {
return res;
}

const persistResult = await persistFunction(formattedRecords);
const persistResult = await persistFunction(formattedRecords, legacyFormattedRecords);
// TODO after migrating records
// add activityLog if persistResult.nonUniqueKeys is not empty
//
// for (const nonUniqueKey of persistResult.summary.nonUniqueKeys) {
// await createActivityLogMessage({
// level: 'error',
// environment_id,
// activity_log_id: activityLogId,
// content: `Found duplicate key '${nonUniqueKey}' for model ${model}. The record was ignored.`,
// timestamp: Date.now()
// });
// }

if (persistResult.success) {
const { summary } = persistResult;
Expand Down
1 change: 1 addition & 0 deletions packages/persist/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"dependencies": {
"@nangohq/shared": "^0.39.18",
"@nangohq/utils": "file:../utils",
"@nangohq/records": "file:../records",
"dd-trace": "5.2.0",
"express": "^4.19.2",
"zod": "^3.22.4",
Expand Down
2 changes: 1 addition & 1 deletion packages/persist/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
"rootDir": "lib",
"outDir": "dist"
},
"references": [{ "path": "../shared" }, { "path": "../utils" }],
"references": [{ "path": "../shared" }, { "path": "../utils" }, { "path": "../records" }],
"include": ["lib/**/*"]
}
2 changes: 1 addition & 1 deletion packages/records/lib/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { ENVS, parseEnvs } from '@nangohq/utils';

export const envs = parseEnvs(ENVS.required({ RECORDS_DATABASE_URL: true, NANGO_ENCRYPTION_KEY: true }));
export const envs = parseEnvs(ENVS.required({ RECORDS_DATABASE_URL: true }));

export const filename = fileURLToPath(import.meta.url);
export const dirname = path.dirname(path.join(filename, '../../'));
11 changes: 0 additions & 11 deletions packages/records/lib/helpers/uniqueKey.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,6 @@ export function verifyUniqueKeysAreUnique(records: FormattedRecord[]): { nonUniq
}

export function removeDuplicateKey(records: FormattedRecord[]): { records: FormattedRecord[]; nonUniqueKeys: string[] } {
// TODO:
// for (const nonUniqueKey of nonUniqueKeys) {
// await createActivityLogMessage({
// level: 'error',
// environment_id,
// activity_log_id: activityLogId,
// content: `There was a duplicate key found: ${nonUniqueKey}. This record will be ignore in relation to the model ${model}.`,
// timestamp: Date.now()
// });
// }

const { nonUniqueKeys } = verifyUniqueKeysAreUnique(records);
const seen = new Set();
const recordsWithoutDuplicates = records.filter((record) => {
Expand Down
2 changes: 2 additions & 0 deletions packages/records/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
export * from './db/migrate.js';
export * as records from './models/records.js';
export * as format from './helpers/format.js';
export * from './types.js';
Loading

0 comments on commit 6ecb055

Please sign in to comment.