Skip to content

Commit

Permalink
fix(data-ingestion): log end user (#3086)
Browse files Browse the repository at this point in the history
## Changes

- Log end user to BigQuery
- Move endUser.service in shared because it's needed there
- Try to have more typesafety in the schema
- Try to add schema migration
- Add feature flag so we can test locally and enable this feature more
easily
  • Loading branch information
bodinsamuel authored Dec 3, 2024
1 parent c53fdc6 commit 32cd029
Show file tree
Hide file tree
Showing 32 changed files with 257 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ export interface NangoProps {
runnerFlags: RunnerFlags;
debug: boolean;
startedAt: Date;
endUser: {
id: number;
endUserId: string | null;
orgId: string | null;
} | null;
axios?: {
request?: AxiosInterceptorManager<AxiosRequestConfig>;
response?: {
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/lib/services/dryrun.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ export class DryRunService {
validateSyncRecords: this.validation,
validateSyncMetadata: false
},
startedAt: new Date()
startedAt: new Date(),
endUser: null
};
if (options.saveResponses) {
nangoProps.rawSaveOutput = new Map<string, unknown[]>();
Expand Down
119 changes: 89 additions & 30 deletions packages/data-ingestion/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { BigQuery } from '@google-cloud/bigquery';
import type { BigQuery as BigQueryType } from '@google-cloud/bigquery';
import { getLogger, isCloud } from '@nangohq/utils';
import { getLogger, flagHasBigQuery } from '@nangohq/utils';

const logger = getLogger('BigQueryClient');

Expand All @@ -21,8 +21,44 @@ interface RunScriptRow {
runTimeInSeconds: number;
createdAt: number;
internalIntegrationId: number | null;
endUser: { id: number; endUserId: string | null; orgId: string | null } | null;
}

const fields = [
{ name: 'executionType', type: 'STRING' },
{ name: 'internalConnectionId', type: 'INTEGER' },
{ name: 'connectionId', type: 'STRING' },
{ name: 'accountId', type: 'INTEGER' },
{ name: 'accountName', type: 'STRING' },
{ name: 'scriptName', type: 'STRING' },
{ name: 'scriptType', type: 'STRING' },
{ name: 'environmentId', type: 'INTEGER' },
{ name: 'environmentName', type: 'STRING' },
{ name: 'providerConfigKey', type: 'STRING' },
{ name: 'status', type: 'STRING' },
{ name: 'syncId', type: 'STRING' },
{ name: 'content', type: 'STRING' },
{ name: 'runTimeInSeconds', type: 'FLOAT' },
{ name: 'createdAt', type: 'INTEGER' },
{ name: 'internalIntegrationId', type: 'INTEGER' },
{ name: 'endUserId', type: 'INTEGER' },
{ name: 'endUserUserId', type: 'STRING' },
{ name: 'endUserOrgId', type: 'STRING' }
] as const;

interface TypeMap {
STRING: string;
INTEGER: number;
FLOAT: number;
}

type RecordType<T extends readonly { name: string; type: keyof TypeMap }[]> = {
[K in T[number] as K['name']]: K['type'] extends keyof TypeMap ? TypeMap[K['type']] | null | undefined : never;
};

// Use the utility type to infer the result
type Schema = RecordType<typeof fields>;

class BigQueryClient {
private client: BigQuery;
private datasetName: string;
Expand All @@ -44,11 +80,13 @@ class BigQueryClient {
}

private async initialize() {
if (!flagHasBigQuery) {
return;
}

try {
if (isCloud) {
await this.createDataSet();
await this.createTable();
}
await this.createDataSet();
await this.createTable();
} catch (e) {
logger.error('Error initializing', e);
}
Expand All @@ -67,38 +105,59 @@ class BigQueryClient {
const [exists] = await table.exists();
if (!exists) {
await table.create({
schema: {
fields: [
{ name: 'executionType', type: 'STRING' },
{ name: 'internalConnectionId', type: 'INTEGER' },
{ name: 'connectionId', type: 'STRING' },
{ name: 'accountId', type: 'INTEGER' },
{ name: 'accountName', type: 'STRING' },
{ name: 'scriptName', type: 'STRING' },
{ name: 'scriptType', type: 'STRING' },
{ name: 'environmentId', type: 'INTEGER' },
{ name: 'environmentName', type: 'STRING' },
{ name: 'providerConfigKey', type: 'STRING' },
{ name: 'status', type: 'STRING' },
{ name: 'syncId', type: 'STRING' },
{ name: 'content', type: 'STRING' },
{ name: 'runTimeInSeconds', type: 'FLOAT' },
{ name: 'createdAt', type: 'INTEGER' },
{ name: 'internalIntegrationId', type: 'INTEGER' }
]
}
schema: { fields }
});
} else {
// If the table exists, retrieve the current schema
const [metadata] = await table.getMetadata();
const existingFields = metadata.schema.fields as { name: string }[];

// Add new fields that don't already exist in the schema
const existingFieldNames = existingFields.map((field) => field.name);
const newFields = fields.filter((field) => !existingFieldNames.includes(field.name));

if (newFields.length > 0) {
// Update the schema with the new fields
const updatedFields = [...existingFields, ...newFields];
await table.setMetadata({
schema: { fields: updatedFields }
});
logger.info('Schema updated successfully with new fields:', newFields);
}
}
}

public async insert(data: RunScriptRow, tableName?: string) {
if (!flagHasBigQuery) {
return;
}

const table = tableName || this.tableName;
try {
if (isCloud) {
await this.client.dataset(this.datasetName).table(table).insert(data);
}
} catch (e) {
logger.error('Error inserting into BigQuery', e);
const insertData: Schema = {
executionType: data.executionType,
internalConnectionId: data.internalConnectionId,
connectionId: data.connectionId,
accountId: data.accountId,
accountName: data.accountName,
scriptName: data.scriptName,
scriptType: data.scriptType,
environmentId: data.environmentId,
environmentName: data.environmentName,
providerConfigKey: data.providerConfigKey,
status: data.status,
syncId: data.syncId,
content: data.content,
runTimeInSeconds: data.runTimeInSeconds,
createdAt: data.createdAt,
internalIntegrationId: data.internalIntegrationId,
endUserId: data.endUser?.id,
endUserOrgId: data.endUser?.orgId,
endUserUserId: data.endUser?.endUserId
};
await this.client.dataset(this.datasetName).table(table).insert(insertData);
} catch (err) {
logger.error('Error inserting into BigQuery', err);
}
}
}
Expand Down
28 changes: 22 additions & 6 deletions packages/jobs/lib/execution/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,23 @@ import {
errorManager,
featureFlags,
getApiUrl,
getEndUserByConnectionId,
getSyncConfigRaw
} from '@nangohq/shared';
import { logContextGetter } from '@nangohq/logs';
import type { DBEnvironment, DBTeam } from '@nangohq/types';
import { startScript } from './operations/start.js';
import { bigQueryClient, slackService } from '../clients.js';
import { getRunnerFlags } from '../utils/flags.js';
import db from '@nangohq/database';

export async function startAction(task: TaskAction): Promise<Result<void>> {
let account: DBTeam | undefined;
let environment: DBEnvironment | undefined;
let providerConfig: Config | undefined | null;
let syncConfig: SyncConfig | null = null;
let endUser: NangoProps['endUser'] | null = null;

try {
const accountAndEnv = await environmentService.getAccountAndEnvironment({ environmentId: task.connection.environment_id });
if (!accountAndEnv) {
Expand All @@ -47,6 +51,11 @@ export async function startAction(task: TaskAction): Promise<Result<void>> {
throw new Error(`Action config not found: ${task.id}`);
}

const getEndUser = await getEndUserByConnectionId(db.knex, { connectionId: task.connection.id });
if (getEndUser.isOk()) {
endUser = { id: getEndUser.value.id, endUserId: getEndUser.value.endUserId, orgId: getEndUser.value.organization?.organizationId || null };
}

const logCtx = await logContextGetter.get({ id: String(task.activityLogId) });
await logCtx.info(`Starting action '${task.actionName}'`, {
input: task.input,
Expand Down Expand Up @@ -74,7 +83,8 @@ export async function startAction(task: TaskAction): Promise<Result<void>> {
syncConfig: syncConfig,
debug: false,
runnerFlags: await getRunnerFlags(featureFlags),
startedAt: new Date()
startedAt: new Date(),
endUser
};

metrics.increment(metrics.Types.ACTION_EXECUTION, 1, { accountId: account.id });
Expand Down Expand Up @@ -108,7 +118,8 @@ export async function startAction(task: TaskAction): Promise<Result<void>> {
error,
syncConfig,
environment: { id: task.connection.environment_id, name: environment?.name || 'unknown' },
...(account?.id && account?.name ? { team: { id: account.id, name: account.name } } : {})
...(account?.id && account?.name ? { team: { id: account.id, name: account.name } } : {}),
endUser
});
return Err(error);
}
Expand Down Expand Up @@ -145,7 +156,8 @@ export async function handleActionSuccess({ nangoProps }: { nangoProps: NangoPro
content: `The action "${nangoProps.syncConfig.sync_name}" has been completed successfully.`,
runTimeInSeconds: (new Date().getTime() - nangoProps.startedAt.getTime()) / 1000,
createdAt: Date.now(),
internalIntegrationId: nangoProps.syncConfig.nango_config_id
internalIntegrationId: nangoProps.syncConfig.nango_config_id,
endUser: nangoProps.endUser
});
}

Expand All @@ -165,7 +177,8 @@ export async function handleActionError({ nangoProps, error }: { nangoProps: Nan
error,
environment: { id: nangoProps.environmentId, name: nangoProps.environmentName || 'unknown' },
syncConfig: nangoProps.syncConfig,
...(nangoProps.team ? { team: { id: nangoProps.team.id, name: nangoProps.team.name } } : {})
...(nangoProps.team ? { team: { id: nangoProps.team.id, name: nangoProps.team.name } } : {}),
endUser: nangoProps.endUser
});
}

Expand All @@ -179,7 +192,8 @@ async function onFailure({
activityLogId,
syncConfig,
runTime,
error
error,
endUser
}: {
connection: NangoConnection;
team?: { id: number; name: string };
Expand All @@ -191,6 +205,7 @@ async function onFailure({
syncConfig: SyncConfig | null;
runTime: number;
error: NangoError;
endUser: NangoProps['endUser'];
}): Promise<void> {
if (team) {
void bigQueryClient.insert({
Expand All @@ -209,7 +224,8 @@ async function onFailure({
content: error.message,
runTimeInSeconds: runTime,
createdAt: Date.now(),
internalIntegrationId: syncConfig?.nango_config_id || null
internalIntegrationId: syncConfig?.nango_config_id || null,
endUser
});
}
const logCtx = await logContextGetter.get({ id: activityLogId });
Expand Down
29 changes: 22 additions & 7 deletions packages/jobs/lib/execution/onEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ import { Err, metrics, Ok } from '@nangohq/utils';
import type { Result } from '@nangohq/utils';
import type { TaskOnEvent } from '@nangohq/nango-orchestrator';
import type { Config, SyncConfig, NangoConnection, NangoProps } from '@nangohq/shared';
import { configService, environmentService, featureFlags, getApiUrl, NangoError } from '@nangohq/shared';
import { configService, environmentService, featureFlags, getApiUrl, getEndUserByConnectionId, NangoError } from '@nangohq/shared';
import { logContextGetter } from '@nangohq/logs';
import type { DBEnvironment, DBTeam } from '@nangohq/types';
import { startScript } from './operations/start.js';
import { bigQueryClient } from '../clients.js';
import db from '@nangohq/database';
import { getRunnerFlags } from '../utils/flags.js';

export async function startOnEvent(task: TaskOnEvent): Promise<Result<void>> {
let account: DBTeam | undefined;
let environment: DBEnvironment | undefined;
let providerConfig: Config | undefined | null;
let syncConfig: SyncConfig | null = null;
let endUser: NangoProps['endUser'] | null = null;

try {
const accountAndEnv = await environmentService.getAccountAndEnvironment({ environmentId: task.connection.environment_id });
if (!accountAndEnv) {
Expand All @@ -27,6 +30,11 @@ export async function startOnEvent(task: TaskOnEvent): Promise<Result<void>> {
throw new Error(`Provider config not found for connection: ${task.connection.connection_id}`);
}

const getEndUser = await getEndUserByConnectionId(db.knex, { connectionId: task.connection.id });
if (getEndUser.isOk()) {
endUser = { id: getEndUser.value.id, endUserId: getEndUser.value.endUserId, orgId: getEndUser.value.organization?.organizationId || null };
}

const logCtx = await logContextGetter.get({ id: String(task.activityLogId) });

await logCtx.info(`Starting script '${task.onEventName}'`, {
Expand Down Expand Up @@ -72,7 +80,8 @@ export async function startOnEvent(task: TaskOnEvent): Promise<Result<void>> {
syncConfig: syncConfig,
debug: false,
runnerFlags: await getRunnerFlags(featureFlags),
startedAt: new Date()
startedAt: new Date(),
endUser
};

metrics.increment(metrics.Types.ON_EVENT_SCRIPT_EXECUTION, 1, { accountId: account.id });
Expand Down Expand Up @@ -104,7 +113,8 @@ export async function startOnEvent(task: TaskOnEvent): Promise<Result<void>> {
error,
environment: { id: task.connection.environment_id, name: environment?.name || 'unknown' },
syncConfig,
...(account?.id && account?.name ? { team: { id: account.id, name: account.name } } : {})
...(account?.id && account?.name ? { team: { id: account.id, name: account.name } } : {}),
endUser
});
return Err(error);
}
Expand All @@ -128,7 +138,8 @@ export async function handleOnEventSuccess({ nangoProps }: { nangoProps: NangoPr
content,
runTimeInSeconds: (new Date().getTime() - nangoProps.startedAt.getTime()) / 1000,
createdAt: Date.now(),
internalIntegrationId: nangoProps.syncConfig.nango_config_id
internalIntegrationId: nangoProps.syncConfig.nango_config_id,
endUser: nangoProps.endUser
});
const logCtx = await logContextGetter.get({ id: String(nangoProps.activityLogId) });
await logCtx.success();
Expand All @@ -149,7 +160,8 @@ export async function handleOnEventError({ nangoProps, error }: { nangoProps: Na
error,
environment: { id: nangoProps.environmentId, name: nangoProps.environmentName || 'unknown' },
syncConfig: nangoProps.syncConfig,
...(nangoProps.team ? { team: { id: nangoProps.team.id, name: nangoProps.team.name } } : {})
...(nangoProps.team ? { team: { id: nangoProps.team.id, name: nangoProps.team.name } } : {}),
endUser: nangoProps.endUser
});
}

Expand All @@ -162,7 +174,8 @@ async function onFailure({
activityLogId,
syncConfig,
runTime,
error
error,
endUser
}: {
connection: NangoConnection;
team?: { id: number; name: string };
Expand All @@ -173,6 +186,7 @@ async function onFailure({
syncConfig: SyncConfig | null;
runTime: number;
error: NangoError;
endUser: NangoProps['endUser'];
}): Promise<void> {
if (team) {
void bigQueryClient.insert({
Expand All @@ -191,7 +205,8 @@ async function onFailure({
content: error.message,
runTimeInSeconds: runTime,
createdAt: Date.now(),
internalIntegrationId: syncConfig?.nango_config_id || null
internalIntegrationId: syncConfig?.nango_config_id || null,
endUser
});
}
const logCtx = await logContextGetter.get({ id: activityLogId });
Expand Down
Loading

0 comments on commit 32cd029

Please sign in to comment.