Skip to content

Commit

Permalink
feat(sync): allow to force a full sync anytime (NangoHQ#1658)
Browse files Browse the repository at this point in the history
  • Loading branch information
bodinsamuel authored Feb 21, 2024
1 parent 094f2c6 commit dc84a19
Show file tree
Hide file tree
Showing 19 changed files with 258 additions and 209 deletions.
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ LOG_LEVEL=info
#
###############################################################################


# Configure where integrations will be loaded from
NANGO_INTEGRATIONS_FULL_PATH=

# Internal Datadog Telemetry
# This telemetry logs metrics/traces in your own Datadog instance
NANGO_TELEMETRY_SDK=false
4 changes: 4 additions & 0 deletions docs-v2/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,10 @@ paths:
description: A list of sync names that you wish to trigger.
items:
type: string
full_resync:
type: boolean
description: Clear the records and reset the "lastSyncDate" associated with the sync before triggering a new sync job.

responses:
'200':
description: Successfully triggered the sync
Expand Down
77 changes: 32 additions & 45 deletions packages/jobs/lib/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import {
updateLatestJobSyncStatus,
LogTypes,
isInitialSyncStillRunning,
initialSyncExists,
getSyncByIdAndName,
logger
logger,
getLastSyncDate
} from '@nangohq/shared';
import integrationService from './integration.service.js';
import type { ContinuousSyncArgs, InitialSyncArgs, ActionArgs, WebhookArgs } from './models/worker';

export async function routeSync(args: InitialSyncArgs): Promise<boolean | object | null> {
const { syncId, syncJobId, syncName, activityLogId, nangoConnection, debug } = args;
const { syncId, syncJobId, syncName, nangoConnection, debug } = args;
let environmentId = nangoConnection?.environment_id;

// https://typescript.temporal.io/api/classes/activity.Context
Expand All @@ -40,17 +40,7 @@ export async function routeSync(args: InitialSyncArgs): Promise<boolean | object
}
const syncConfig: ProviderConfig = (await configService.getProviderConfig(nangoConnection?.provider_config_key as string, environmentId)) as ProviderConfig;

return syncProvider(
syncConfig,
syncId,
syncJobId,
syncName,
SyncType.INITIAL,
{ ...nangoConnection, environment_id: environmentId },
activityLogId,
context,
debug
);
return syncProvider(syncConfig, syncId, syncJobId, syncName, SyncType.INITIAL, { ...nangoConnection, environment_id: environmentId }, context, debug);
}

export async function runAction(args: ActionArgs): Promise<ServiceResponse> {
Expand Down Expand Up @@ -83,7 +73,7 @@ export async function runAction(args: ActionArgs): Promise<ServiceResponse> {
}

export async function scheduleAndRouteSync(args: ContinuousSyncArgs): Promise<boolean | object | null> {
const { syncId, activityLogId, syncName, nangoConnection, debug } = args;
const { syncId, syncName, nangoConnection, debug } = args;
let environmentId = nangoConnection?.environment_id;
let syncJobId;

Expand All @@ -107,7 +97,8 @@ export async function scheduleAndRouteSync(args: ContinuousSyncArgs): Promise<bo

// https://typescript.temporal.io/api/classes/activity.Context
const context: Context = Context.current();
const syncType = (await initialSyncExists(syncId as string)) ? SyncType.INCREMENTAL : SyncType.INITIAL;
const lastSyncDate = await getLastSyncDate(syncId);
const syncType = lastSyncDate ? SyncType.INCREMENTAL : SyncType.INITIAL;
try {
if (!nangoConnection?.environment_id) {
environmentId = (await environmentService.getEnvironmentIdForAccountAssumingProd(nangoConnection.account_id as number)) as number;
Expand Down Expand Up @@ -142,7 +133,6 @@ export async function scheduleAndRouteSync(args: ContinuousSyncArgs): Promise<bo
syncName,
syncType,
{ ...nangoConnection, environment_id: environmentId },
activityLogId ?? 0,
context,
debug
);
Expand Down Expand Up @@ -178,7 +168,7 @@ export async function scheduleAndRouteSync(args: ContinuousSyncArgs): Promise<bo
syncName
});

await errorManager.report(content, {
errorManager.report(content, {
environmentId,
source: ErrorSourceEnum.PLATFORM,
operation: LogActionEnum.SYNC,
Expand Down Expand Up @@ -207,30 +197,26 @@ export async function syncProvider(
syncName: string,
syncType: SyncType,
nangoConnection: NangoConnection,
existingActivityLogId: number,
temporalContext: Context,
debug = false
): Promise<boolean | object | null> {
const action = syncType === SyncType.INITIAL ? LogActionEnum.FULL_SYNC : LogActionEnum.SYNC;
try {
let activityLogId = existingActivityLogId;

if (syncType === SyncType.INCREMENTAL || existingActivityLogId === 0) {
const log = {
level: 'info' as LogLevel,
success: null,
action: LogActionEnum.SYNC,
start: Date.now(),
end: Date.now(),
timestamp: Date.now(),
connection_id: nangoConnection?.connection_id as string,
provider_config_key: nangoConnection?.provider_config_key as string,
provider: syncConfig.provider,
session_id: syncJobId ? syncJobId?.toString() : '',
environment_id: nangoConnection?.environment_id as number,
operation_name: syncName
};
activityLogId = (await createActivityLog(log)) as number;
}
const log = {
level: 'info' as LogLevel,
success: null,
action,
start: Date.now(),
end: Date.now(),
timestamp: Date.now(),
connection_id: nangoConnection?.connection_id as string,
provider_config_key: nangoConnection?.provider_config_key as string,
provider: syncConfig.provider,
session_id: syncJobId ? syncJobId?.toString() : '',
environment_id: nangoConnection?.environment_id as number,
operation_name: syncName
};
const activityLogId = (await createActivityLog(log)) as number;

if (debug) {
await createActivityLogMessage({
Expand Down Expand Up @@ -264,7 +250,7 @@ export async function syncProvider(
const log = {
level: 'info' as LogLevel,
success: false,
action: LogActionEnum.SYNC,
action,
start: Date.now(),
end: Date.now(),
timestamp: Date.now(),
Expand All @@ -284,18 +270,18 @@ export async function syncProvider(
content
});

await telemetry.log(LogTypes.SYNC_OVERLAP, content, LogActionEnum.SYNC, {
await telemetry.log(LogTypes.SYNC_OVERLAP, content, action, {
environmentId: String(nangoConnection?.environment_id),
syncId,
connectionId: nangoConnection?.connection_id as string,
providerConfigKey: nangoConnection?.provider_config_key as string,
syncName
});

await errorManager.report(content, {
errorManager.report(content, {
environmentId: nangoConnection?.environment_id as number,
source: ErrorSourceEnum.PLATFORM,
operation: LogActionEnum.SYNC,
operation: action,
metadata: {
connectionId: nangoConnection?.connection_id as string,
providerConfigKey: nangoConnection?.provider_config_key as string,
Expand Down Expand Up @@ -410,7 +396,7 @@ export async function reportFailure(

export async function cancelActivity(workflowArguments: InitialSyncArgs | ContinuousSyncArgs): Promise<void> {
try {
const { syncId, activityLogId, syncName, nangoConnection, debug } = workflowArguments;
const { syncId, syncName, nangoConnection, debug } = workflowArguments;

const context: Context = Context.current();

Expand All @@ -421,7 +407,8 @@ export async function cancelActivity(workflowArguments: InitialSyncArgs | Contin
environmentId
)) as ProviderConfig;

const syncType = (await initialSyncExists(syncId as string)) ? SyncType.INCREMENTAL : SyncType.INITIAL;
const lastSyncDate = await getLastSyncDate(syncId);
const syncType = lastSyncDate ? SyncType.INCREMENTAL : SyncType.INITIAL;

const syncRun = new syncRunService({
integrationService,
Expand All @@ -430,7 +417,7 @@ export async function cancelActivity(workflowArguments: InitialSyncArgs | Contin
nangoConnection,
syncType,
syncName,
activityLogId,
activityLogId: undefined,
provider: syncConfig.provider,
temporalContext: context,
debug: Boolean(debug)
Expand Down
2 changes: 0 additions & 2 deletions packages/jobs/lib/models/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ export interface InitialSyncArgs {
syncId: string;
syncJobId: number;
syncName: string;
activityLogId: number;
nangoConnection: NangoConnection;
debug?: boolean;
}

export interface ContinuousSyncArgs {
syncId: string;
activityLogId: number;
syncName: string;
syncData: NangoIntegrationData;
nangoConnection: NangoConnection;
Expand Down
5 changes: 3 additions & 2 deletions packages/node-client/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ export class Nango {
return response.data;
}

public async triggerSync(providerConfigKey: string, syncs?: string[], connectionId?: string): Promise<void> {
public async triggerSync(providerConfigKey: string, syncs?: string[], connectionId?: string, fullResync?: boolean): Promise<void> {
const url = `${this.serverUrl}/sync/trigger`;

if (typeof syncs === 'string') {
Expand All @@ -414,7 +414,8 @@ export class Nango {
const body = {
syncs: syncs || [],
provider_config_key: providerConfigKey,
connection_id: connectionId
connection_id: connectionId,
full_resync: fullResync
};

return axios.post(url, body, { headers: this.enrichHeaders() });
Expand Down
26 changes: 14 additions & 12 deletions packages/server/lib/controllers/sync.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,23 +242,17 @@ class SyncController {

public async trigger(req: Request, res: Response, next: NextFunction) {
try {
const { syncs: syncNames } = req.body;
let { provider_config_key, connection_id } = req.body;

if (!provider_config_key) {
provider_config_key = req.get('Provider-Config-Key') as string;
}

if (!connection_id) {
connection_id = req.get('Connection-Id') as string;
}
const { syncs: syncNames, full_resync } = req.body;

const provider_config_key: string | undefined = req.body.provider_config_key || req.get('Provider-Config-Key');
if (!provider_config_key) {
res.status(400).send({ message: 'Missing provider config key' });

return;
}

const connection_id: string | undefined = req.body.connection_id || req.get('Connection-Id');

if (typeof syncNames === 'string') {
res.status(400).send({ message: 'Syncs must be an array' });

Expand All @@ -271,13 +265,18 @@ class SyncController {
return;
}

if (full_resync && typeof full_resync !== 'boolean') {
res.status(400).send({ message: 'full_resync must be a boolean' });
return;
}

const environmentId = getEnvironmentId(res);

const { success, error } = await syncOrchestrator.runSyncCommand(
environmentId,
provider_config_key as string,
provider_config_key,
syncNames as string[],
SyncCommand.RUN,
full_resync ? SyncCommand.RUN_FULL : SyncCommand.RUN,
connection_id
);

Expand Down Expand Up @@ -627,6 +626,9 @@ class SyncController {
case SyncCommand.RUN:
event = AnalyticsTypes.SYNC_RUN;
break;
case SyncCommand.CANCEL:
event = AnalyticsTypes.SYNC_CANCEL;
break;
}

analytics.trackByEnvironmentId(event, environment.id, {
Expand Down
2 changes: 1 addition & 1 deletion packages/server/lib/server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import tracer from './tracer.js';
import './utils/config.js';
import bodyParser from 'body-parser';
import multer from 'multer';
import _ from './utils/config.js';
import oauthController from './controllers/oauth.controller.js';
import configController from './controllers/config.controller.js';
import providerController from './controllers/provider.controller.js';
Expand Down
Loading

0 comments on commit dc84a19

Please sign in to comment.