Skip to content

Commit

Permalink
feat(pkgs): create kvstore (#2235)
Browse files Browse the repository at this point in the history
## Describe your changes

Fixes NAN-1071

- Create a reusable package for KVStore
Duplicated for now, can't remove it entirely from `shared` because
FeatureFlags and Locking, but it's a first step. I stopped exporting it
so it's no longer used by external package and I'll do a second pass to
remove it entirely.
Note: I did that because logs can't depend on shared


- Use it in Jobs


- Use it in Logs
As discussed orally, the main id is to be able to retrieve the indexName
quickly without doing a full search on the cluster. Maybe it's the fix
maybe not.
  • Loading branch information
bodinsamuel authored Jun 3, 2024
1 parent d15b5a0 commit 376c5b1
Show file tree
Hide file tree
Showing 26 changed files with 348 additions and 21 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ COPY packages/shared/package.json ./packages/shared/package.json
COPY packages/utils/package.json ./packages/utils/package.json
COPY packages/webapp/package.json ./packages/webapp/package.json
COPY packages/data-ingestion/package.json ./packages/data-ingestion/package.json
COPY packages/kvstore/package.json ./packages/kvstore/package.json
COPY packages/logs/package.json ./packages/logs/package.json
COPY packages/records/package.json ./packages/records/package.json
COPY packages/types/package.json ./packages/types/package.json
Expand Down
18 changes: 18 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"packages/cli",
"packages/shared",
"packages/frontend",
"packages/kvstore",
"packages/logs",
"packages/node-client",
"packages/records",
Expand Down
1 change: 1 addition & 0 deletions packages/jobs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ COPY packages/shared/ packages/shared/
COPY packages/utils/ packages/utils/
COPY packages/records/ packages/records/
COPY packages/data-ingestion/ packages/data-ingestion/
COPY packages/kvstore/ packages/kvstore/
COPY packages/jobs/ packages/jobs/
COPY packages/logs/ packages/logs/
COPY packages/runner/ packages/runner/
Expand Down
15 changes: 4 additions & 11 deletions packages/jobs/lib/runner/runner.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { KVStore } from '@nangohq/shared/lib/utils/kvstore/KVStore.js';
import { LocalRunner } from './local.runner.js';
import { RenderRunner } from './render.runner.js';
import { RemoteRunner } from './remote.runner.js';
import { isEnterprise, env, getLogger } from '@nangohq/utils';
import { getRedisUrl, InMemoryKVStore, RedisKVStore } from '@nangohq/shared';
import type { ProxyAppRouter } from '@nangohq/nango-runner';
import type { KVStore } from '@nangohq/kvstore';
import { createKVStore } from '@nangohq/kvstore';

const logger = getLogger('Runner');

Expand Down Expand Up @@ -111,7 +111,7 @@ class RunnerCache {

async set(runner: Runner): Promise<void> {
const ttl = 7 * 24 * 60 * 60 * 1000; // 7 days
await this.store.set(this.cacheKey(runner.id), JSON.stringify(runner), true, ttl);
await this.store.set(this.cacheKey(runner.id), JSON.stringify(runner), { canOverride: true, ttlInMs: ttl });
}

async delete(runnerId: string): Promise<void> {
Expand All @@ -120,13 +120,6 @@ class RunnerCache {
}

const runnersCache = await (async () => {
let store: KVStore;
const url = getRedisUrl();
if (url) {
store = new RedisKVStore(url);
await (store as RedisKVStore).connect();
} else {
store = new InMemoryKVStore();
}
const store = await createKVStore();
return new RunnerCache(store);
})();
1 change: 1 addition & 0 deletions packages/jobs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
},
"dependencies": {
"@nangohq/data-ingestion": "file:../data-ingestion",
"@nangohq/kvstore": "file:../kvstore",
"@nangohq/logs": "file:../logs",
"@nangohq/nango-orchestrator": "file:../orchestrator",
"@nangohq/nango-runner": "file:../runner",
Expand Down
3 changes: 3 additions & 0 deletions packages/jobs/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
"outDir": "dist"
},
"references": [
{
"path": "../kvstore"
},
{
"path": "../logs"
},
Expand Down
39 changes: 39 additions & 0 deletions packages/kvstore/lib/FeatureFlags.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { getLogger } from '@nangohq/utils';
import type { KVStore } from './KVStore.js';

const logger = getLogger('FeatureFlags');

export class FeatureFlags {
kvstore: KVStore | undefined;

constructor(kvstore: KVStore | undefined) {
if (!kvstore) {
logger.error('Feature flags not enabled');
}

this.kvstore = kvstore;
}

async isEnabled({
key,
distinctId,
fallback,
isExcludingFlag = false
}: {
key: string;
distinctId: string;
fallback: boolean;
isExcludingFlag?: boolean;
}): Promise<boolean> {
if (!this.kvstore) {
return fallback;
}

try {
const exists = await this.kvstore.exists(`flag:${key}:${distinctId}`);
return isExcludingFlag ? !exists : exists;
} catch {
return fallback;
}
}
}
53 changes: 53 additions & 0 deletions packages/kvstore/lib/InMemoryStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import type { KVStore } from './KVStore.js';

interface Value {
value: string;
timestamp: number;
ttlInMs: number;
}

export class InMemoryKVStore implements KVStore {
private store: Map<string, Value>;

constructor() {
this.store = new Map();
}

public async get(key: string): Promise<string | null> {
const res = this.store.get(key);
if (res === undefined) {
return null;
}
if (this.isExpired(res)) {
this.store.delete(key);
return null;
}
return Promise.resolve(res.value);
}

public async set(key: string, value: string, opts?: { canOverride?: boolean; ttlInMs?: number }): Promise<void> {
const res = this.store.get(key);
const isExpired = res && this.isExpired(res);
if (isExpired || opts?.canOverride || res === undefined) {
this.store.set(key, { value: value, timestamp: Date.now(), ttlInMs: opts?.ttlInMs || 0 });
return Promise.resolve();
}
return Promise.reject(new Error('Key already exists'));
}

public async delete(key: string): Promise<void> {
this.store.delete(key);
return Promise.resolve();
}

public async exists(key: string): Promise<boolean> {
return Promise.resolve(this.store.has(key));
}

private isExpired(value: Value): boolean {
if (value.ttlInMs > 0 && value.timestamp + value.ttlInMs < Date.now()) {
return true;
}
return false;
}
}
71 changes: 71 additions & 0 deletions packages/kvstore/lib/InMemoryStore.unit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { beforeEach, describe, expect, it } from 'vitest';
import { InMemoryKVStore } from './InMemoryStore.js';

describe('InMemoryKVStore', () => {
let store: InMemoryKVStore;
beforeEach(() => {
store = new InMemoryKVStore();
});

it('should set and get a value', async () => {
await store.set('key', 'value');
const value = await store.get('key');
expect(value).toEqual('value');
});

it('should return null for a non-existent key', async () => {
const value = await store.get('do-not-exist');
expect(value).toBeNull();
});

it('should allow overriding a key', async () => {
await store.set('key', 'value');
await store.set('key', 'value2', { canOverride: true });
const value = await store.get('key');
expect(value).toEqual('value2');
});

it('should not allow overriding a key', async () => {
await store.set('key', 'value');
await expect(store.set('key', 'value2', { canOverride: false })).rejects.toEqual(new Error('Key already exists'));
});

it('should return null for a key that has expired', async () => {
const ttlInMs = 1000;
await store.set('key', 'value', { canOverride: true, ttlInMs });
await new Promise((resolve) => setTimeout(resolve, ttlInMs * 2));
const value = await store.get('key');
expect(value).toBeNull();
});

it('should not return null for a key that has not expired', async () => {
const ttlInMs = 2000;
await store.set('key', 'value', { canOverride: true, ttlInMs });
await new Promise((resolve) => setTimeout(resolve, ttlInMs / 2));
const value = await store.get('key');
expect(value).toEqual('value');
});

it('should allow setting an expired key', async () => {
await store.set('key', 'value', { canOverride: false, ttlInMs: 10 });
await new Promise((resolve) => setTimeout(resolve, 20));
await expect(store.set('key', 'value', { canOverride: false })).resolves.not.toThrow();
});

it('should allow setting a key with a TTL of 0', async () => {
await store.set('key', 'value', { canOverride: true, ttlInMs: 0 });
const value = await store.get('key');
expect(value).toEqual('value');
});

it('should allow deleting a key', async () => {
await store.delete('key');
const value = await store.get('key');
expect(value).toBeNull();
});
it('should allow checking if a key exists', async () => {
await expect(store.exists('key')).resolves.toEqual(false);
await store.set('key', 'value');
await expect(store.exists('key')).resolves.toEqual(true);
});
});
6 changes: 6 additions & 0 deletions packages/kvstore/lib/KVStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface KVStore {
set(key: string, value: string, options?: { canOverride?: boolean; ttlInMs?: number }): Promise<void>;
get(key: string): Promise<string | null>;
delete(key: string): Promise<void>;
exists(key: string): Promise<boolean>;
}
47 changes: 47 additions & 0 deletions packages/kvstore/lib/RedisStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type { KVStore } from './KVStore.js';
import { createClient } from 'redis';
import type { RedisClientType } from 'redis';

export class RedisKVStore implements KVStore {
private client: RedisClientType;

constructor(url: string) {
this.client = createClient({ url: url });

this.client.on('error', (err) => {
console.error(`Redis (kvstore) error: ${err}`);
});
}

public async connect(): Promise<void> {
return this.client.connect().then(() => {});
}

public async get(key: string): Promise<string | null> {
return this.client.get(key);
}

public async set(key: string, value: string, opts?: { canOverride?: boolean; ttlInMs?: number }): Promise<void> {
const options: any = {};
if (opts) {
if (opts.ttlInMs && opts.ttlInMs > 0) {
options['PX'] = opts.ttlInMs;
}
if (opts.canOverride === false) {
options['NX'] = true;
}
}
const res = await this.client.set(key, value, options);
if (res !== 'OK') {
throw new Error(`Failed to set key: ${key}, value: ${value}, ${JSON.stringify(options)}`);
}
}

public async exists(key: string): Promise<boolean> {
return (await this.client.exists(key)) > 0;
}

public async delete(key: string): Promise<void> {
await this.client.del(key);
}
}
17 changes: 17 additions & 0 deletions packages/kvstore/lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { InMemoryKVStore } from './InMemoryStore.js';
import { RedisKVStore } from './RedisStore.js';

export { InMemoryKVStore } from './InMemoryStore.js';
export { RedisKVStore } from './RedisStore.js';
export type { KVStore } from './KVStore.js';

export async function createKVStore() {
const url = process.env['NANGO_REDIS_URL'];
if (url) {
const store = new RedisKVStore(url);
await store.connect();
return store;
}

return new InMemoryKVStore();
}
26 changes: 26 additions & 0 deletions packages/kvstore/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"name": "@nangohq/kvstore",
"version": "1.0.0",
"type": "module",
"main": "./dist/index.js",
"types": "./dist/index.js",
"private": true,
"scripts": {
"build": "rimraf ./dist && tsc"
},
"repository": {
"type": "git",
"url": "git+https://github.com/NangoHQ/nango.git",
"directory": "packages/logs"
},
"dependencies": {
"@nangohq/utils": "file:../utils",
"redis": "4.6.13"
},
"devDependencies": {
"vitest": "0.33.0"
},
"files": [
"dist/**/*"
]
}
Loading

0 comments on commit 376c5b1

Please sign in to comment.