From 5242ab940eecd466f85345f55d0ed60d18b7a774 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 14 Nov 2018 10:18:26 -0800 Subject: [PATCH] prepare 5.6.0 release (#128) --- caching_store_wrapper.js | 143 +++++++++++ package-lock.json | 13 +- package.json | 2 +- redis_feature_store.js | 156 +++--------- test/caching_store_wrapper-test.js | 386 +++++++++++++++++++++++++++++ test/feature_store-test.js | 4 +- test/feature_store_test_base.js | 163 +++++++++++- test/redis_feature_store-test.js | 66 ++--- test/update_queue-test.js | 61 +++++ update_queue.js | 29 +++ 10 files changed, 846 insertions(+), 177 deletions(-) create mode 100644 caching_store_wrapper.js create mode 100644 test/caching_store_wrapper-test.js create mode 100644 test/update_queue-test.js create mode 100644 update_queue.js diff --git a/caching_store_wrapper.js b/caching_store_wrapper.js new file mode 100644 index 0000000..ec71f54 --- /dev/null +++ b/caching_store_wrapper.js @@ -0,0 +1,143 @@ +var NodeCache = require('node-cache'), + dataKind = require('./versioned_data_kind'), + UpdateQueue = require('./update_queue'); + +function cacheKey(kind, key) { + return kind.namespace + ":" + key; +} + +function allCacheKey(kind) { + return "$all:" + kind.namespace; +} + +var initializedKey = "$checkedInit"; + +/* + CachingStoreWrapper provides commonly needed functionality for implementations of an + SDK feature store. The underlyingStore must implement a simplified interface for + querying and updating the data store (see redis_feature_store.js for an example) + while CachingStoreWrapper adds optional caching of stored items and of the + initialized state, and ensures that asynchronous operations are serialized correctly. +*/ +function CachingStoreWrapper(underlyingStore, ttl) { + var cache = ttl ? new NodeCache({ stdTTL: ttl }) : null; + var queue = new UpdateQueue(); + var initialized = false; + + this.underlyingStore = underlyingStore; + + this.init = function(allData, cb) { + queue.enqueue(function(cb) { + underlyingStore.initInternal(allData, function() { + initialized = true; + + if (cache) { + cache.del(initializedKey); + cache.flushAll(); + + // populate cache with initial data + for (var kindNamespace in allData) { + if (Object.hasOwnProperty.call(allData, kindNamespace)) { + var kind = dataKind[kindNamespace]; + var items = allData[kindNamespace]; + cache.set(allCacheKey(kind), items); + for (var key in items) { + cache.set(cacheKey(kind, key), items[key]); + } + } + } + } + + cb(); + }); + }, [], cb); + }; + + this.initialized = function(cb) { + if (initialized) { + cb(true); + } else if (cache && cache.get(initializedKey)) { + cb(false); + } else { + underlyingStore.initializedInternal(function(inited) { + initialized = inited; + if (!initialized) { + cache && cache.set(initializedKey, true); + } + cb(initialized); + }); + } + }; + + this.all = function(kind, cb) { + var items = cache && cache.get(allCacheKey(kind)); + if (items) { + cb(items); + return; + } + + underlyingStore.getAllInternal(kind, function(items) { + var filteredItems = {}; + Object.keys(items).forEach(function(key) { + var item = items[key]; + if (item && !item.deleted) { + filteredItems[key] = item; + } + }); + cache && cache.set(allCacheKey(kind), filteredItems); + cb(filteredItems); + }); + }; + + this.get = function(kind, key, cb) { + if (cache) { + var item = cache.get(cacheKey(kind, key)); + if (item !== undefined) { + cb(itemOnlyIfNotDeleted(item)); + return; + } + } + + underlyingStore.getInternal(kind, key, function(item) { + cache && cache.set(cacheKey(kind, key), item); + cb(itemOnlyIfNotDeleted(item)); + }); + }; + + function itemOnlyIfNotDeleted(item) { + return (!item || item.deleted) ? null : item; + } + + this.upsert = function(kind, newItem, cb) { + queue.enqueue(function (cb) { + flushAllCaches(); + underlyingStore.upsertInternal(kind, newItem, function(err, updatedItem) { + if (!err) { + cache && cache.set(cacheKey(kind, newItem.key), updatedItem); + } + cb(); + }); + }, [], cb); + }; + + this.delete = function(kind, key, version, cb) { + this.upsert(kind, { key: key, version: version, deleted: true }, cb); + }; + + this.close = function() { + cache.close(); + underlyingStore.close(); + }; + + function flushAllCaches() { + if (!cache) { + return; + } + for (var kindNamespace in dataKind) { + cache.del(allCacheKey(dataKind[kindNamespace])); + } + } +} + +module.exports = CachingStoreWrapper; + diff --git a/package-lock.json b/package-lock.json index e841e98..45308a6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "ldclient-node", - "version": "5.4.2", + "version": "5.5.0", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -3855,9 +3855,9 @@ } }, "merge": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/merge/-/merge-1.2.0.tgz", - "integrity": "sha1-dTHjnUlJwoGma4xabgJl6LBYlNo=", + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/merge/-/merge-1.2.1.tgz", + "integrity": "sha512-VjFo4P5Whtj4vsLzsYBu5ayHhoHJ0UqNm7ibvShmbmoz7tGi0vXaoJbGdB+GmDMLUdg8DpQXEIeVDAe8MaABvQ==", "dev": true }, "merge-stream": { @@ -5845,8 +5845,9 @@ "dev": true }, "tunnel": { - "version": "https://github.com/launchdarkly/node-tunnel/tarball/d860e57650cce1ea655d00854c81babe6b47e02c", - "integrity": "sha1-DxkgfzcgRtPUaCGDy+INSgR8zdk=" + "version": "0.0.6", + "resolved": "https://registry.npmjs.org/tunnel/-/tunnel-0.0.6.tgz", + "integrity": "sha512-1h/Lnq9yajKY2PEbBadPXj3VxsDDu844OnaAo52UVmIzIvwwtBPIuNvkjuzBlTWpfJyUbG3ez0KSBibQkj4ojg==" }, "tunnel-agent": { "version": "0.6.0", diff --git a/package.json b/package.json index 8c00a3d..9ab7c3f 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "request": "2.87.0", "request-etag": "^2.0.3", "semver": "5.5.0", - "tunnel": "https://github.com/launchdarkly/node-tunnel/tarball/d860e57650cce1ea655d00854c81babe6b47e02c", + "tunnel": "0.0.6", "winston": "2.4.1" }, "engines": { diff --git a/redis_feature_store.js b/redis_feature_store.js index 7b35504..009aae1 100644 --- a/redis_feature_store.js +++ b/redis_feature_store.js @@ -1,21 +1,22 @@ var redis = require('redis'), - NodeCache = require( "node-cache" ), winston = require('winston'), - dataKind = require('./versioned_data_kind'); + dataKind = require('./versioned_data_kind'), + CachingStoreWrapper = require('./caching_store_wrapper'); var noop = function(){}; function RedisFeatureStore(redisOpts, cacheTTL, prefix, logger) { + return new CachingStoreWrapper(new redisFeatureStoreInternal(redisOpts, prefix, logger), cacheTTL); +} + +function redisFeatureStoreInternal(redisOpts, prefix, logger) { var client = redis.createClient(redisOpts), store = {}, itemsPrefix = (prefix || "launchdarkly") + ":", - cache = cacheTTL ? new NodeCache({ stdTTL: cacheTTL}) : null, - updateQueue = [], - inited = false, - checkedInit = false; + initedKey = itemsPrefix + "$inited"; logger = (logger || new winston.Logger({ @@ -55,23 +56,11 @@ function RedisFeatureStore(redisOpts, cacheTTL, prefix, logger) { return itemsPrefix + kind.namespace; } - function cacheKey(kind, key) { - return kind.namespace + ":" + key; - } - // A helper that performs a get with the redis client function doGet(kind, key, cb) { var item; cb = cb || noop; - if (cacheTTL) { - item = cache.get(cacheKey(kind, key)); - if (item) { - cb(item); - return; - } - } - if (!connected) { logger.warn('Attempted to fetch key ' + key + ' while Redis connection is down'); cb(null); @@ -89,33 +78,7 @@ function RedisFeatureStore(redisOpts, cacheTTL, prefix, logger) { }); } - function executePendingUpdates() { - if (updateQueue.length > 0) { - const entry = updateQueue[0]; - const fn = entry[0]; - const args = entry[1]; - const cb = entry[2]; - const newCb = function() { - updateQueue.shift(); - if (updateQueue.length > 0) { - setImmediate(executePendingUpdates); - } - cb && cb(); - }; - fn.apply(store, args.concat([newCb])); - } - } - - // Places an update operation on the queue. - var serializeFn = function(updateFn, fnArgs, cb) { - updateQueue.push([updateFn, fnArgs, cb]); - if (updateQueue.length == 1) { - // if nothing else is in progress, we can start this one right away - executePendingUpdates(); - } - }; - - store.get = function(kind, key, cb) { + store.getInternal = function(kind, key, cb) { cb = cb || noop; doGet(kind, key, function(item) { if (item && !item.deleted) { @@ -126,7 +89,7 @@ function RedisFeatureStore(redisOpts, cacheTTL, prefix, logger) { }); }; - store.all = function(kind, cb) { + store.getAllInternal = function(kind, cb) { cb = cb || noop; if (!connected) { logger.warn('Attempted to fetch all keys while Redis connection is down'); @@ -143,11 +106,8 @@ function RedisFeatureStore(redisOpts, cacheTTL, prefix, logger) { items = obj; for (var key in items) { - if (Object.hasOwnProperty.call(items,key)) { - var item = JSON.parse(items[key]); - if (!item.deleted) { - results[key] = item; - } + if (Object.hasOwnProperty.call(items, key)) { + results[key] = JSON.parse(items[key]); } } cb(results); @@ -155,17 +115,9 @@ function RedisFeatureStore(redisOpts, cacheTTL, prefix, logger) { }); }; - store.init = function(allData, cb) { - serializeFn(store._init, [allData], cb); - }; - - store._init = function(allData, cb) { + store.initInternal = function(allData, cb) { var multi = client.multi(); - if (cacheTTL) { - cache.flushAll(); - } - for (var kindNamespace in allData) { if (Object.hasOwnProperty.call(allData, kindNamespace)) { var kind = dataKind[kindNamespace]; @@ -177,9 +129,6 @@ function RedisFeatureStore(redisOpts, cacheTTL, prefix, logger) { if (Object.hasOwnProperty.call(items, key)) { stringified[key] = JSON.stringify(items[key]); } - if (cacheTTL) { - cache.set(cacheKey(kind, key), items[key]); - } } // Redis does not allow hmset() with an empty object if (Object.keys(stringified).length > 0) { @@ -188,66 +137,44 @@ function RedisFeatureStore(redisOpts, cacheTTL, prefix, logger) { } } + multi.set(initedKey, ""); + multi.exec(function(err, replies) { if (err) { logger.error("Error initializing Redis store", err); - } else { - inited = true; } cb(); }); }; - store.delete = function(kind, key, version, cb) { - serializeFn(store._delete, [kind, key, version], cb); - }; - - store._delete = function(kind, key, version, cb) { - var deletedItem = { key: key, version: version, deleted: true }; - updateItemWithVersioning(kind, deletedItem, cb, - function(err) { - if (err) { - logger.error("Error deleting key " + key + " in '" + kind.namespace + "'", err); - } - }); - } - - store.upsert = function(kind, item, cb) { - serializeFn(store._upsert, [kind, item], cb); - }; - - store._upsert = function(kind, item, cb) { - updateItemWithVersioning(kind, item, cb, - function(err) { - if (err) { - logger.error("Error upserting key " + key + " in '" + kind.namespace + "'", err); - } - }); + store.upsertInternal = function(kind, item, cb) { + updateItemWithVersioning(kind, item, function(err, attemptedWrite) { + if (err) { + logger.error("Error upserting key " + key + " in '" + kind.namespace + "'", err); + } + cb(err, attemptedWrite); + }); } - function updateItemWithVersioning(kind, newItem, cb, resultFn) { + function updateItemWithVersioning(kind, newItem, cb) { client.watch(itemsKey(kind)); var multi = client.multi(); - // test_transaction_hook is instrumentation, set only by the unit tests - var prepare = store.test_transaction_hook || function(prepareCb) { prepareCb(); }; + // testUpdateHook is instrumentation, used only by the unit tests + var prepare = store.testUpdateHook || function(prepareCb) { prepareCb(); }; prepare(function() { doGet(kind, newItem.key, function(oldItem) { if (oldItem && oldItem.version >= newItem.version) { multi.discard(); - cb(); + cb(null, oldItem); } else { multi.hset(itemsKey(kind), newItem.key, JSON.stringify(newItem)); multi.exec(function(err, replies) { if (!err && replies === null) { // This means the EXEC failed because someone modified the watched key logger.debug("Concurrent modification detected, retrying"); - updateItemWithVersioning(kind, newItem, cb, resultFn); + updateItemWithVersioning(kind, newItem, cb); } else { - resultFn(err); - if (!err && cacheTTL) { - cache.set(cacheKey(kind, newItem.key), newItem); - } - cb(); + cb(err, newItem); } }); } @@ -255,37 +182,18 @@ function RedisFeatureStore(redisOpts, cacheTTL, prefix, logger) { }); } - store.initialized = function(cb) { + store.initializedInternal = function(cb) { cb = cb || noop; - if (inited) { - // Once we've determined that we're initialized, we can never become uninitialized again - cb(true); - } - else if (checkedInit) { - // We don't want to hit Redis for this question more than once; if we've already checked there - // and it wasn't populated, we'll continue to say we're uninited until init() has been called - cb(false); - } - else { - var inited = false; - client.exists(itemsKey(dataKind.features), function(err, obj) { - if (!err && obj) { - inited = true; - } - checkedInit = true; - cb(inited); - }); - } + client.exists(initedKey, function(err, obj) { + cb(Boolean(!err && obj)); + }); }; store.close = function() { client.quit(); - if (cacheTTL) { - cache.close(); - } }; return store; } -module.exports = RedisFeatureStore; \ No newline at end of file +module.exports = RedisFeatureStore; diff --git a/test/caching_store_wrapper-test.js b/test/caching_store_wrapper-test.js new file mode 100644 index 0000000..fd6169c --- /dev/null +++ b/test/caching_store_wrapper-test.js @@ -0,0 +1,386 @@ +var CachingStoreWrapper = require('../caching_store_wrapper'); +var features = require('../versioned_data_kind').features; + +function MockCore() { + const c = { + data: { features: {} }, + inited: false, + initQueriedCount: 0, + + initInternal: function(newData, cb) { + c.data = newData; + cb(); + }, + + getInternal: function(kind, key, cb) { + cb(c.data[kind.namespace][key]); + }, + + getAllInternal: function(kind, cb) { + cb(c.data[kind.namespace]); + }, + + upsertInternal: function(kind, item, cb) { + const oldItem = c.data[kind.namespace][item.key]; + if (oldItem && oldItem.version >= item.version) { + cb(null, oldItem); + } else { + c.data[kind.namespace][item.key] = item; + cb(null, item); + } + }, + + initializedInternal: function(cb) { + c.initQueriedCount++; + cb(c.inited); + }, + + forceSet: function(kind, item) { + c.data[kind.namespace][item.key] = item; + }, + + forceRemove: function(kind, key) { + delete c.data[kind.namespace][key]; + } + }; + return c; +} + +const cacheSeconds = 15; + +function runCachedAndUncachedTests(name, testFn) { + describe(name, function() { + const core1 = MockCore(); + const wrapper1 = new CachingStoreWrapper(core1, cacheSeconds); + it('cached', function(done) { testFn(done, wrapper1, core1, true); }); + + const core2 = MockCore(); + const wrapper2 = new CachingStoreWrapper(core2, 0); + it('uncached', function(done) { testFn(done, wrapper2, core2, false); }); + }); +} + +function runCachedTestOnly(name, testFn) { + it(name, function(done) { + const core = MockCore(); + const wrapper = new CachingStoreWrapper(core, cacheSeconds); + testFn(done, wrapper, core); + }); +} + +describe('CachingStoreWrapper', function() { + + runCachedAndUncachedTests('get()', function(done, wrapper, core, isCached) { + const flagv1 = { key: 'flag', version: 1 }; + const flagv2 = { key: 'flag', version: 2 }; + + core.forceSet(features, flagv1); + + wrapper.get(features, flagv1.key, function(item) { + expect(item).toEqual(flagv1); + + core.forceSet(features, flagv2); // Make a change that bypasses the cache + + wrapper.get(features, flagv1.key, function(item) { + // If cached, it should return the cached value rather than calling the underlying getter + expect(item).toEqual(isCached ? flagv1 : flagv2); + + done(); + }); + }); + }); + + runCachedAndUncachedTests('get() with deleted item', function(done, wrapper, core, isCached) { + const flagv1 = { key: 'flag', version: 1, deleted: true }; + const flagv2 = { key: 'flag', version: 2, deleted: false }; + + core.forceSet(features, flagv1); + + wrapper.get(features, flagv1.key, function(item) { + expect(item).toBe(null); + + core.forceSet(features, flagv2); // Make a change that bypasses the cache + + wrapper.get(features, flagv2.key, function(item) { + // If cached, the deleted state should persist in the cache + expect(item).toEqual(isCached ? null : flagv2); + + done(); + }); + }); + }); + + runCachedAndUncachedTests('get() with missing item', function(done, wrapper, core, isCached) { + const flag = { key: 'flag', version: 1 }; + + wrapper.get(features, flag.key, function(item) { + expect(item).toBe(null); + + core.forceSet(features, flag); + + wrapper.get(features, flag.key, function(item) { + // If cached, the previous null result should persist in the cache + expect(item).toEqual(isCached ? null : flag); + + done(); + }); + }); + }); + + runCachedTestOnly('cached get() uses values from init()', function(done, wrapper, core) { + const flagv1 = { key: 'flag', version: 1 }; + const flagv2 = { key: 'flag', version: 2 }; + + const allData = { features: { 'flag': flagv1 } }; + + wrapper.init(allData, function() { + expect(core.data).toEqual(allData); + + core.forceSet(features, flagv2); + + wrapper.get(features, flagv1.key, function(item) { + expect(item).toEqual(flagv1); + + done(); + }); + }); + }); + + runCachedAndUncachedTests('all()', function(done, wrapper, core, isCached) { + const flag1 = { key: 'flag1', version: 1 }; + const flag2 = { key: 'flag2', version: 1 }; + + core.forceSet(features, flag1); + core.forceSet(features, flag2); + + wrapper.all(features, function(items) { + expect(items).toEqual({ 'flag1': flag1, 'flag2': flag2 }); + + core.forceRemove(features, flag2.key); + + wrapper.all(features, function(items) { + if (isCached) { + expect(items).toEqual({ 'flag1': flag1, 'flag2': flag2 }); + } else { + expect(items).toEqual({ 'flag1': flag1 }); + } + + done(); + }); + }); + }); + + runCachedAndUncachedTests('all() with deleted item', function(done, wrapper, core, isCached) { + const flag1 = { key: 'flag1', version: 1 }; + const flag2 = { key: 'flag2', version: 1, deleted: true }; + + core.forceSet(features, flag1); + core.forceSet(features, flag2); + + wrapper.all(features, function(items) { + expect(items).toEqual({ 'flag1': flag1 }); + + core.forceRemove(features, flag1.key); + + wrapper.all(features, function(items) { + if (isCached) { + expect(items).toEqual({ 'flag1': flag1 }); + } else { + expect(items).toEqual({ }); + } + + done(); + }); + }); + }); + + runCachedTestOnly('cached all() uses values from init()', function(done, wrapper, core) { + const flag1 = { key: 'flag1', version: 1 }; + const flag2 = { key: 'flag2', version: 1 }; + + const allData = { features: { flag1: flag1, flag2: flag2 } }; + + wrapper.init(allData, function() { + core.forceRemove(features, flag2.key); + + wrapper.all(features, function(items) { + expect(items).toEqual({ flag1: flag1, flag2: flag2 }); + + done(); + }); + }); + }); + + runCachedTestOnly('cached all() uses fresh values if there has been an update', function(done, wrapper, core) { + const flag1v1 = { key: 'flag1', version: 1 }; + const flag1v2 = { key: 'flag1', version: 2 }; + const flag2v1 = { key: 'flag2', version: 1 }; + const flag2v2 = { key: 'flag2', version: 2 }; + + const allData = { features: { flag1: flag1v1, flag2: flag2v2 } }; + + wrapper.init(allData, function() { + expect(core.data).toEqual(allData); + + // make a change to flag1 using the wrapper - this should flush the cache + wrapper.upsert(features, flag1v2, function() { + // make a change to flag2 that bypasses the cache + core.forceSet(features, flag2v2); + + // we should now see both changes since the cache was flushed + wrapper.all(features, function(items) { + expect(items).toEqual({ flag1: flag1v2, flag2: flag2v2 }); + + done(); + }); + }); + }); + }); + + runCachedAndUncachedTests('upsert() - successful', function(done, wrapper, core, isCached) { + const flagv1 = { key: 'flag', version: 1 }; + const flagv2 = { key: 'flag', version: 2 }; + + wrapper.upsert(features, flagv1, function() { + expect(core.data[features.namespace][flagv1.key]).toEqual(flagv1); + + wrapper.upsert(features, flagv2, function() { + expect(core.data[features.namespace][flagv1.key]).toEqual(flagv2); + + // if we have a cache, verify that the new item is now cached by writing a different value + // to the underlying data - get() should still return the cached item + if (isCached) { + const flagv3 = { key: 'flag', version: 3 }; + core.forceSet(features, flagv3); + } + + wrapper.get(features, flagv1.key, function(item) { + expect(item).toEqual(flagv2); + + done(); + }); + }); + }); + }); + + runCachedTestOnly('cached upsert() - unsuccessful', function(done, wrapper, core) { + const flagv1 = { key: 'flag', version: 1 }; + const flagv2 = { key: 'flag', version: 2 }; + + core.forceSet(features, flagv2); // this is now in the underlying data, but not in the cache + + wrapper.upsert(features, flagv1, function() { + expect(core.data[features.namespace][flagv1.key]).toEqual(flagv2); // value in store remains the same + + // the cache should now contain flagv2 - check this by making another change that bypasses + // the cache, and verifying that get() uses the cached value instead + const flagv3 = { key: 'flag', version: 3 }; + core.forceSet(features, flagv3); + + wrapper.get(features, flagv1.key, function(item) { + expect(item).toEqual(flagv2); + + done(); + }); + }); + }); + + runCachedAndUncachedTests('delete()', function(done, wrapper, core, isCached) { + const flagv1 = { key: 'flag', version: 1 }; + const flagv2 = { key: 'flag', version: 2, deleted: true }; + const flagv3 = { key: 'flag', version: 3 }; + + core.forceSet(features, flagv1); + + wrapper.get(features, flagv1.key, function(item) { + expect(item).toEqual(flagv1); + + wrapper.delete(features, flagv1.key, flagv2.version); + + expect(core.data[features.namespace][flagv1.key]).toEqual(flagv2); + + // make a change to the flag that bypasses the cache + core.forceSet(features, flagv3); + + wrapper.get(features, flagv1.key, function(item) { + expect(item).toEqual(isCached ? null : flagv3); + + done(); + }); + }); + }); + + describe('initialized()', function() { + it('calls underlying initialized() only if not already inited', function(done) { + const core = MockCore(); + const wrapper = new CachingStoreWrapper(core, 0); + + wrapper.initialized(function(value) { + expect(value).toEqual(false); + expect(core.initQueriedCount).toEqual(1); + + core.inited = true; + + wrapper.initialized(function(value) { + expect(value).toEqual(true); + expect(core.initQueriedCount).toEqual(2); + + core.inited = false; // this should have no effect since we already returned true + + wrapper.initialized(function(value) { + expect(value).toEqual(true); + expect(core.initQueriedCount).toEqual(2); + + done(); + }); + }); + }); + }); + + it('will not call initialized() if init() has been called', function(done) { + const core = MockCore(); + const wrapper = new CachingStoreWrapper(core, 0); + + wrapper.initialized(function(value) { + expect(value).toEqual(false); + expect(core.initQueriedCount).toEqual(1); + + const allData = { features: {} }; + wrapper.init(allData, function() { + wrapper.initialized(function(value) { + expect(value).toEqual(true); + expect(core.initQueriedCount).toEqual(1); + + done(); + }); + }); + }); + }); + + it('can cache false result', function(done) { + const core = MockCore(); + const wrapper = new CachingStoreWrapper(core, 1); // cache TTL = 1 second + + wrapper.initialized(function(value) { + expect(value).toEqual(false); + expect(core.initQueriedCount).toEqual(1); + + core.inited = true; + + wrapper.initialized(function(value) { + expect(value).toEqual(false); + expect(core.initQueriedCount).toEqual(1); + + setTimeout(function() { + wrapper.initialized(function(value) { + expect(value).toEqual(true); + expect(core.initQueriedCount).toEqual(2); + + done(); + }); + }, 1100); + }); + }); + }); + }); +}); diff --git a/test/feature_store-test.js b/test/feature_store-test.js index 4ca19ce..71ab1db 100644 --- a/test/feature_store-test.js +++ b/test/feature_store-test.js @@ -1,8 +1,8 @@ var InMemoryFeatureStore = require('../feature_store'); -var allFeatureStoreTests = require('./feature_store_test_base'); +var testBase = require('./feature_store_test_base'); describe('InMemoryFeatureStore', function() { - allFeatureStoreTests(function() { + testBase.baseFeatureStoreTests(function() { return new InMemoryFeatureStore(); }) }); diff --git a/test/feature_store_test_base.js b/test/feature_store_test_base.js index 5bb874e..f253e0b 100644 --- a/test/feature_store_test_base.js +++ b/test/feature_store_test_base.js @@ -1,6 +1,17 @@ var dataKind = require('../versioned_data_kind'); -function allFeatureStoreTests(makeStore) { +// The following tests should be run on every feature store implementation. If this type of +// store supports caching, the tests should be run once with caching enabled and once with +// caching disabled. +// +// Parameters: +// - makeStore(): creates an instance of the feature store. +// - clearExistingData(callback): if specified, will be called before each test to clear any +// storage that the store instances may be sharing. +// - isCached: true if the instances returned by makeStore() have caching enabled. If +// applicable, + +function baseFeatureStoreTests(makeStore, clearExistingData, isCached) { var feature1 = { key: 'foo', version: 10 @@ -10,6 +21,14 @@ function allFeatureStoreTests(makeStore) { version: 10 }; + beforeEach(function(done) { + if (clearExistingData) { + clearExistingData(done); + } else { + done(); + } + }); + function initedStore(cb) { var store = makeStore(); var initData = {}; @@ -31,6 +50,70 @@ function allFeatureStoreTests(makeStore) { }); }); + it('init() completely replaces previous data', function(done) { + var store = makeStore(); + var flags = { + first: { key: 'first', version: 1 }, + second: { key: 'second', version: 1 } + }; + var segments = { first: { key: 'first', version: 2 } }; + var initData = {}; + initData[dataKind.features.namespace] = flags; + initData[dataKind.segments.namespace] = segments; + + store.init(initData, function() { + store.all(dataKind.features, function(items) { + expect(items).toEqual(flags); + store.all(dataKind.segments, function(items) { + expect(items).toEqual(segments); + + var newFlags = { first: { key: 'first', version: 3 } }; + var newSegments = { first: { key: 'first', version: 4 } }; + var initData = {}; + initData[dataKind.features.namespace] = newFlags; + initData[dataKind.segments.namespace] = newSegments; + + store.init(initData, function() { + store.all(dataKind.features, function(items) { + expect(items).toEqual(newFlags); + store.all(dataKind.segments, function(items) { + expect(items).toEqual(newSegments); + + done(); + }) + }) + }); + }); + }); + }); + }); + + if (!isCached && clearExistingData) { + function testInitStateDetection(desc, initData) { + it(desc, function(done) { + var store1 = makeStore(); + var store2 = makeStore(); + + store1.initialized(function(result) { + expect(result).toBe(false); + + store2.init(initData, function() { + store1.initialized(function(result) { + expect(result).toBe(true); + done(); + }); + }); + }); + }); + } + + testInitStateDetection('can detect if another instance has initialized the store', + { features: { foo: feature1 } }); + + testInitStateDetection('can detect if another instance has initialized the store, even with empty data', + { features: {} }); + } + it('gets existing feature', function(done) { initedStore(function(store) { store.get(dataKind.features, feature1.key, function(result) { @@ -165,4 +248,80 @@ function allFeatureStoreTests(makeStore) { }); } -module.exports = allFeatureStoreTests; +// The following tests require that the feature store can be instrumented in such a way as to run +// some test code in the middle of an upsert operation. +// +// Parameters: +// - makeStore(): creates a normal feature store. +// - makeStoreWithHook(hook): creates a feature store that operates on the same underlying data as +// the first store. This store will call the hook function (passing a callback) immediately before +// it attempts to make any update. + +function concurrentModificationTests(makeStore, makeStoreWithHook) { + + var flagKey = 'flag'; + var initialVersion = 1; + + var competingStore = makeStore(); + + function makeFlagWithVersion(v) { + return { key: flagKey, version: v }; + } + + function withInitedStore(store, cb) { + var allData = { features: {} }; + allData['features'][flagKey] = makeFlagWithVersion(initialVersion); + store.init(allData, cb); + } + + function writeCompetingVersions(flagVersionsToWrite) { + var i = 0; + return function(callback) { + if (i < flagVersionsToWrite.length) { + var newFlag = makeFlagWithVersion(flagVersionsToWrite[i]); + i++; + competingStore.upsert(dataKind.features, newFlag, callback); + } else { + callback(); + } + }; + } + + it('handles upsert race condition against other client with lower version', function(done) { + var myDesiredVersion = 10; + var competingStoreVersions = [ 2, 3, 4 ]; // proves that we can retry multiple times if necessary + + var myStore = makeStoreWithHook(writeCompetingVersions(competingStoreVersions)); + + withInitedStore(myStore, function() { + myStore.upsert(dataKind.features, makeFlagWithVersion(myDesiredVersion), function() { + myStore.get(dataKind.features, flagKey, function(result) { + expect(result.version).toEqual(myDesiredVersion); + done(); + }); + }); + }); + }); + + it('handles upsert race condition against other client with higher version', function(done) { + var myDesiredVersion = 2; + var competingStoreVersion = 3; + + var myStore = makeStoreWithHook(writeCompetingVersions([ competingStoreVersion ])); + + withInitedStore(myStore, function() { + myStore.upsert(dataKind.features, makeFlagWithVersion(myDesiredVersion), function() { + myStore.get(dataKind.features, flagKey, function(result) { + expect(result.version).toEqual(competingStoreVersion); + done(); + }); + }); + }); + }); +} + +module.exports = { + baseFeatureStoreTests: baseFeatureStoreTests, + concurrentModificationTests: concurrentModificationTests +}; + diff --git a/test/redis_feature_store-test.js b/test/redis_feature_store-test.js index 282d9dd..1ba8837 100644 --- a/test/redis_feature_store-test.js +++ b/test/redis_feature_store-test.js @@ -1,54 +1,36 @@ var RedisFeatureStore = require('../redis_feature_store'); -var allFeatureStoreTests = require('./feature_store_test_base'); +var testBase = require('./feature_store_test_base'); var dataKind = require('../versioned_data_kind'); var redis = require('redis'); describe('RedisFeatureStore', function() { var redisOpts = { url: 'redis://localhost:6379' }; - function makeStore() { - return new RedisFeatureStore(redisOpts, 30000); + var extraRedisClient = redis.createClient(redisOpts); + + function makeCachedStore() { + return new RedisFeatureStore(redisOpts, 30); + } + + function makeUncachedStore() { + return new RedisFeatureStore(redisOpts, 0); } - allFeatureStoreTests(makeStore); - - it('handles upsert race condition against external client correctly', function(done) { - var store = makeStore(); - var otherClient = redis.createClient(redisOpts); - - var feature1 = { - key: 'foo', - version: 1 - }; - var intermediateVer = { key: feature1.key, version: feature1.version }; - var finalVer = { key: feature1.key, version: 10 }; - - var initData = {}; - initData[dataKind.features.namespace] = { - 'foo': feature1 - }; - - store.init(initData, function() { - var tries = 0; - // This function will be called in between the WATCH and the update transaction. - // We're testing that the store will detect this concurrent modification and will - // transparently retry the update. - store.test_transaction_hook = function(cb) { - if (tries < 3) { - tries++; - intermediateVer.version++; - otherClient.hset("launchdarkly:features", "foo", JSON.stringify(intermediateVer), cb); - } else { - cb(); - } - }; - store.upsert(dataKind.features, finalVer, function() { - store.get(dataKind.features, feature1.key, function(result) { - otherClient.quit(); - expect(result).toEqual(finalVer); - done(); - }); - }); + function clearExistingData(callback) { + extraRedisClient.flushdb(callback); + } + + testBase.baseFeatureStoreTests(makeCachedStore, clearExistingData, true); + testBase.baseFeatureStoreTests(makeUncachedStore, clearExistingData, false); + + testBase.concurrentModificationTests(makeUncachedStore, + function(hook) { + var store = makeCachedStore(); + store.underlyingStore.testUpdateHook = hook; + return store; }); + + afterAll(function() { + extraRedisClient.quit(); }); }); diff --git a/test/update_queue-test.js b/test/update_queue-test.js new file mode 100644 index 0000000..b182855 --- /dev/null +++ b/test/update_queue-test.js @@ -0,0 +1,61 @@ +var UpdateQueue = require('../update_queue'); + +describe('UpdateQueue', function() { + it('executes task immediately if there are no pending tasks', function(done) { + const q = new UpdateQueue(); + + var updated = false; + const updateFn = function(a, b, cb) { + expect(a).toEqual(1); + expect(b).toEqual(2); + updated = true; + cb(); + }; + + q.enqueue(updateFn, [1, 2], function() { + expect(updated).toEqual(true); + done(); + }); + }); + + it('serializes async tasks in the order submitted', function(done) { + const q = new UpdateQueue(); + + var progress = []; + + // This simulates a condition in which events are being received asynchronously and each + // event triggers an asynchronous task. We want to make sure that the tasks are executed in + // the order submitted, even if one is submitted during the execution of the previous one. + const taskFn = function(i, cb) { + progress.push('start ' + i); + // assume that we're doing something asynchronous here - make sure it takes a little time + setTimeout(cb, 20); + }; + + const expected = [ + 'submit 1', + 'start 1', // note, this one executes immediately because there was nothing pending + 'submit 2', + 'submit 3', + 'end 1', + 'start 2', + 'end 2', + 'start 3', + 'end 3' + ]; + + for (var i = 1; i <= 3; i++) { + const j = i; + setImmediate(function() { + progress.push('submit ' + j); + q.enqueue(taskFn, [j], function() { + progress.push('end ' + j); + if (j >= 3) { + expect(progress).toEqual(expected); + done(); + } + }); + }); + }; + }); +}); diff --git a/update_queue.js b/update_queue.js new file mode 100644 index 0000000..7c57fa4 --- /dev/null +++ b/update_queue.js @@ -0,0 +1,29 @@ + +function UpdateQueue() { + var updateQueue = []; + this.enqueue = function(updateFn, fnArgs, cb) { + updateQueue.push(arguments); + if (updateQueue.length === 1) { + // if nothing else is in progress, we can start this one right away + executePendingUpdates(); + } + }; + function executePendingUpdates() { + if (updateQueue.length > 0) { + const entry = updateQueue[0]; + const fn = entry[0]; + const args = entry[1]; + const cb = entry[2]; + const newCb = function() { + updateQueue.shift(); + if (updateQueue.length > 0) { + setImmediate(executePendingUpdates); + } + cb && cb(); + }; + fn.apply(null, args.concat([newCb])); + } + } +} + +module.exports = UpdateQueue;