Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
add rps counters
Browse files Browse the repository at this point in the history
  • Loading branch information
Russ Frank committed Aug 10, 2016
1 parent 6e06893 commit 133597d
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 0 deletions.
87 changes: 87 additions & 0 deletions rps_counters.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

'use strict';

/* eslint max-statements: [2, 40] */
var assert = require('assert');
var globalTimers = require('timers');

module.exports = RPSCounters;

var BUCKET_SIZE = 10 * 1000; // 10 second buckets

function RPSCounters(timers) {
this.aCounters = {};
this.bCounters = {};
this.timer = null;
this.curr = this.aCounters;
this.timers = timers || globalTimers;

this.counterKeys = [];

var self = this;
this.boundOnTimer = boundOnTimer;
function boundOnTimer() {
self.onTimer();
}
}

RPSCounters.prototype.getCounts = function getCounts() {
return this.curr === this.aCounters? this.bCounters : this.aCounters;
};

RPSCounters.prototype.bootstrap = function bootstrap() {
this.timer = this.timers.setTimeout(this.boundOnTimer, BUCKET_SIZE);
};

RPSCounters.prototype.onTimer = function onTimer() {
this.swap();
this.timer = this.timers.setTimeout(this.boundOnTimer, BUCKET_SIZE);
};

RPSCounters.prototype.destroy = function destroy() {
this.timers.clearTimeout(this.timer);
};

RPSCounters.prototype.inc = function inc(sourceServiceName, destServiceName) {
var counterName = sourceServiceName + "~~" + destServiceName;
if (!this.curr[counterName]) {
this.aCounters[counterName] = 0;
this.bCounters[counterName] = 0;
this.curr[counterName] = 1;
this.counterKeys.push(counterName);
} else {
this.curr[counterName] += 1;
}
};

RPSCounters.prototype.swap = function swap() {
if (this.curr === this.aCounters) {
this.curr = this.bCounters;
} else {
this.curr = this.aCounters;
}

var i;
for (i = 0; i < this.counterKeys.length; i++) {
this.curr[this.counterKeys[i]] = 0;
}
};
9 changes: 9 additions & 0 deletions service-proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var IntervalScan = require('./lib/interval-scan.js');
var RateLimiter = require('./rate_limiter.js');
var PartialRange = require('./partial_range.js');
var Circuits = require('./circuits.js');
var RPSCounters = require('./rps_counters.js');

var MAX_AFFINITY_AUDIT_ROUNDS = 3;

Expand Down Expand Up @@ -275,6 +276,9 @@ function ServiceDispatchHandler(options) {
self.enableCircuits();
}

self.rpsCounters = new RPSCounters(self.channel.timers);
self.rpsCounters.bootstrap();

function onEgressNodesChanged() {
setImmediate(updateServiceChannels);
}
Expand Down Expand Up @@ -344,6 +348,8 @@ function handleLazily(conn, reqFrame) {

var callerName = reqFrame.bodyRW.lazy.readCallerNameStr(reqFrame);

self.rpsCounters.inc(callerName, nextService);

if (!callerName) {
self.channel.logger.warn(
'request missing cn header',
Expand Down Expand Up @@ -456,6 +462,8 @@ function handleRequest(req, buildRes) {
var routingDelegate = req.headers && req.headers.rd;
var nextService = routingDelegate || req.serviceName;

self.rpsCounters.inc(req.headers.cn, nextService);

if (self.isBlocked(req.headers && req.headers.cn, nextService)) {
req.operations.popInReq(req.id);
return;
Expand Down Expand Up @@ -1490,6 +1498,7 @@ function destroy() {
self.servicePurger.stop();
self.statEmitter.stop();
self.rateLimiter.destroy();
self.rpsCounters.destroy();
};

ServiceDispatchHandler.prototype.initCircuits =
Expand Down
120 changes: 120 additions & 0 deletions test/rps_counters.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

'use strict';

var test = require('tape');
var RPSCounters = require('../rps_counters');
var Timer = require('time-mock');
var allocCluster = require('./lib/test-cluster.js');
var parallel = require('run-parallel');

test('rps counters', function t1(assert) {
var timer = Timer(0);
var counters = new RPSCounters(timer);
counters.bootstrap();

counters.inc('rtapi', 'octane');
assert.equals(counters.curr['rtapi~~octane'], 1);

counters.inc('rtapi', 'octane');
counters.inc('rtapi', 'octane');
counters.inc('rtapi', 'octane');
counters.inc('rtapi', 'octane');
assert.equals(counters.curr['rtapi~~octane'], 5);

timer.advance(10 * 1000);
assert.equals(counters.curr['rtapi~~octane'], 0);

counters.inc('rtapi', 'octane');
counters.inc('rtapi', 'octane');
counters.inc('rtapi', 'octane');
counters.inc('rtapi', 'octane');
assert.equals(counters.curr['rtapi~~octane'], 4);

timer.advance(10 * 1000);
assert.equals(counters.curr['rtapi~~octane'], 0);

counters.inc('rtapi', 'abacus');
counters.inc('rtapi', 'abacus');
assert.equals(counters.curr['rtapi~~abacus'], 2);

timer.advance(10 * 1000);
assert.equals(counters.curr['rtapi~~octane'], 0);
assert.equals(counters.curr['rtapi~~abacus'], 0);

var counts = counters.getCounts();
assert.equals(counts['rtapi~~abacus'], 2);
assert.equals(counts['rtapi~~octane'], 0);

assert.end();
});

allocCluster.test('forwarding small timeout concurrently', {
size: 2,
dummySize: 2,
namedRemotes: ['mary']
}, function t(cluster, assert) {
var steve = cluster.remotes.steve;
var bob = cluster.remotes.bob;

var mary = cluster.namedRemotes[0];

cluster.checkExitPeers(assert, {
serviceName: steve.serviceName,
hostPort: steve.hostPort
});

steve.serverChannel.register('m', function m(req, res) {
res.headers.as = 'raw';
res.sendOk(null, 'oh hi');
});

parallel([
send(bob.clientChannel),
send(mary.clientChannel)
], function onResults(err2, results) {
assert.ifError(err2);

assert.equal(results[0].ok, true);
assert.equal(results[1].ok, true);

var rpsCounter0 = cluster.apps[0].clients.serviceProxy.rpsCounters;
var rpsCounter1 = cluster.apps[1].clients.serviceProxy.rpsCounters;

assert.ok(
rpsCounter0.curr['mary~~steve'] === 1 ||
rpsCounter1.curr['mary~~steve'] === 1);
assert.ok(
rpsCounter0.curr['bob~~steve'] === 1 ||
rpsCounter1.curr['bob~~steve'] === 1);

assert.end();
});
});

function send(chan) {
return function thunk(cb) {
chan.request({
serviceName: 'steve',
timeout: 300
}).send('m', null, null, cb);
};
}

0 comments on commit 133597d

Please sign in to comment.