Skip to content

Commit

Permalink
use candleBatcher in tradingAdvisor
Browse files Browse the repository at this point in the history
  • Loading branch information
askmike committed Jul 4, 2015
1 parent 1d810f6 commit cf3c565
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 118 deletions.
4 changes: 2 additions & 2 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ config.watch = {
exchange: 'Bitstamp', // 'MtGox', 'BTCe', 'Bitstamp', 'cexio' or 'kraken'
currency: 'USD',
asset: 'BTC',
interval: 20, // seconds between fetches
interval: 20, // seconds between fetches TODO: currently unused
}

// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -36,7 +36,7 @@ config.tradingAdvisor = {
enabled: true,
method: 'DEMA',
candleSize: 3,
historySize: 50
historySize: 10
}

// Exponential Moving Averages settings:
Expand Down
6 changes: 3 additions & 3 deletions core/candleBatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
// convert them to any desired
// size.

// Acts as stream: takes
// 1m candles as input
// and emits bigger candles
// Acts as ~fake~ stream: takes
// 1m candles as input and emits
// bigger candles.
//
// input are transported candles.

Expand Down
11 changes: 6 additions & 5 deletions core/gekkoStream.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Small writable stream wrapper that
// passes data to all `candleProcessors`.
// passes data to all `candleConsumers`.

var Writable = require('stream').Writable;
var _ = require('lodash');

var Gekko = function(candleProcessors) {
this.candleProcessors = candleProcessors;
var Gekko = function(candleConsumers) {
this.candleConsumers = candleConsumers;
Writable.call(this, {objectMode: true});
}

Expand All @@ -14,8 +14,9 @@ Gekko.prototype = Object.create(Writable.prototype, {
});

Gekko.prototype._write = function(chunk, encoding, callback) {
_.each(this.candleProcessors, function(p) {
p.processCandle(chunk);
// TODO: use different ticks and pause until all are done
_.each(this.candleConsumers, function(c) {
c.processCandle(chunk);
});

callback();
Expand Down
30 changes: 0 additions & 30 deletions core/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,36 +83,6 @@ var util = {
var total = _.reduce(list, function(m, n) { return m + n }, 0);
return total / list.length;
},
// calculate the average trade price out of a sample of trades.
// The sample consists of all trades that happened after the treshold.
calculatePriceSince: function(treshold, trades) {
var sample = [];
_.every(trades, function(trade) {
if(moment.unix(trade.date) < treshold)
return false;

var price = parseFloat(trade.price);
sample.push(price);
return true;
});

return util.average(sample);
},
// calculate the average trade price out of a sample of trades.
// The sample consists of all trades that happened before the treshold.
calculatePriceTill: function(treshold, trades) {
var sample = [];
_.every(trades, function(trade) {
if(moment.unix(trade.date) > treshold)
return false;

var price = parseFloat(trade.price);
sample.push(price);
return true;
});

return util.average(sample);
},
calculateTimespan: function(a, b) {
if(a < b)
return b.diff(a);
Expand Down
112 changes: 58 additions & 54 deletions exchanges.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,60 +30,6 @@
// tradeError: If gekko is currently not able to trade at this exchange, please set it
// to an URL explaining the problem.
var exchanges = [
{
name: 'MtGox',
slug: 'mtgox',
direct: true,
infinityOrder: true,
currencies: [
'USD', 'EUR', 'GBP', 'AUD', 'CAD', 'CHF', 'CNY',
'DKK', 'HKD', 'PLN', 'RUB', 'SGD', 'THB'
],
assets: ['BTC'],
markets: [
{
pair: ['USD', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
},
{
pair: ['EUR', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
},
{
pair: ['GBP', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
},
{
pair: ['AUD', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
},
{
pair: ['CAD', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
},
{
pair: ['CHF', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
},
{
pair: ['CNY', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
},
{
pair: ['DKK', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
},
{
pair: ['HKD', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
},
{
pair: ['PLN', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
},
{
pair: ['RUB', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
},
{
pair: ['SGD', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
},
{
pair: ['THB', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
}
],
requires: ['key', 'secret'],
providesHistory: false
},
{
name: 'BTC-e',
slug: 'btce',
Expand Down Expand Up @@ -161,6 +107,7 @@ var exchanges = [
infinityOrder: false,
currencies: ['USD'],
assets: ['BTC'],
maxTradesAge: 60,
markets: [
{
pair: ['USD', 'BTC'], minimalOrder: { amount: 1, unit: 'currency' }
Expand Down Expand Up @@ -264,6 +211,63 @@ var exchanges = [
providesHistory: false,
tid: 'date'
}
// ,
// ---- Keeping this here for historical purposes. ----
// {
//
// name: 'MtGox',
// slug: 'mtgox',
// direct: true,
// infinityOrder: true,
// currencies: [
// 'USD', 'EUR', 'GBP', 'AUD', 'CAD', 'CHF', 'CNY',
// 'DKK', 'HKD', 'PLN', 'RUB', 'SGD', 'THB'
// ],
// assets: ['BTC'],
// markets: [
// {
// pair: ['USD', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
// },
// {
// pair: ['EUR', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
// },
// {
// pair: ['GBP', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
// },
// {
// pair: ['AUD', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
// },
// {
// pair: ['CAD', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
// },
// {
// pair: ['CHF', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
// },
// {
// pair: ['CNY', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
// },
// {
// pair: ['DKK', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
// },
// {
// pair: ['HKD', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
// },
// {
// pair: ['PLN', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
// },
// {
// pair: ['RUB', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
// },
// {
// pair: ['SGD', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
// },
// {
// pair: ['THB', 'BTC'], minimalOrder: { amount: 0.01, unit: 'asset' }
// }
// ],
// requires: ['key', 'secret'],
// providesHistory: false
// }
];

module.exports = exchanges;
64 changes: 45 additions & 19 deletions gekko.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,28 @@ var plugins = [];
// all emitting plugins
var emitters = {};
// all plugins interested in candles
var candleProcessors = [];
var candleConsumers = [];

// TODO, this is not the proper place to do this.
var exchanges = require(dirs.gekko + 'exchanges');
var exchange = _.find(exchanges, function(e) {
return e.name === config.watch.exchange;
});
// Update tradingAdvisor.historySize if the exchange is able to send more data.
var requiredHistory = config.tradingAdvisor.candleSize * config.tradingAdvisor.historySize;
console.log(requiredHistory,exchange.maxTradesAge)
if(requiredHistory < exchange.maxTradesAge) {
var properHistorySize = Math.ceil(
exchange.maxTradesAge / config.tradingAdvisor.candleSize
) - 1;

log.debug('Overwriting historySize to', properHistorySize, 'due to exchange.')
util.setConfigProperty(
'tradingAdvisor',
'historySize',
properHistorySize
);
}

// Instantiate each enabled plugin
var loadPlugins = function(next) {
Expand Down Expand Up @@ -118,7 +139,6 @@ var referenceEmitters = function(next) {
}

var subscribePlugins = function(next) {

var subscriptions = require(dirs.gekko + 'subscriptions');

// events broadcasted by plugins
Expand Down Expand Up @@ -157,28 +177,29 @@ var subscribePlugins = function(next) {
}

});
});

// events broadcasted by the market
var marketSubscriptions = _.filter(
subscriptions,
{emitter: 'market'}
);

// events broadcasted by the market
var marketSubscriptions = _.filter(
subscriptions,
{emitter: 'market'}
);
// subscribe plugins to the market
_.each(plugins, function(plugin) {
_.each(marketSubscriptions, function(sub) {

// subscribe plugins to the market
_.each(plugins, function(plugin) {
_.each(marketSubscriptions, function(sub) {
// for now, only subscribe to candles
if(sub.event !== 'candle')
return;

// for now, only subscribe to candles
if(sub.event !== 'candle')
return;
if(_.has(plugin, sub.handler))
candleConsumers.push(plugin);

if(_.has(plugin, sub.handler))
candleProcessors.push(plugin);
});
});

});

console.log(candleConsumers.length);
next();
}

Expand All @@ -196,8 +217,13 @@ async.series(
// everything is setup!

market = new Market(config.watch)
// .start()
.pipe(new GekkoStream(candleProcessors));
.start()
.pipe(new GekkoStream(candleConsumers));

// convert JS objects to JSON string
// .pipe(new require('stringify-stream')())
// output to standard out
// .pipe(process.stdout);

}
);
1 change: 0 additions & 1 deletion methods/DEMA.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ method.log = function() {
}

method.check = function() {

var dema = this.indicators.dema;
var diff = dema.result;
var price = this.lastPrice;
Expand Down
17 changes: 13 additions & 4 deletions plugins/tradingAdvisor.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
var util = require('../core/util');
var dirs = util.dirs();
var log = require('../core/log');
var _ = require('lodash');

var config = util.getConfig();
var dirs = util.dirs();
var log = require(dirs.core + '/log');
var CandleBatcher = require(dirs.core + 'candleBatcher');

var methods = [
'MACD',
Expand All @@ -13,9 +14,9 @@ var methods = [
'custom'
];

// TODO: emit event

var Actor = function() {
this.batcher = new CandleBatcher(config.tradingAdvisor.candleSize);

_.bindAll(this);

var methodName = config.tradingAdvisor.method;
Expand All @@ -36,6 +37,8 @@ var Actor = function() {
});

this.method = new Consultant;
this.batcher
.on('candle', this.processCustomCandle)

this.method
.on('advice', this.relayAdvice);
Expand All @@ -44,7 +47,13 @@ var Actor = function() {
util.makeEventEmitter(Actor);

// HANDLERS
// process the 1m candles
Actor.prototype.processCandle = function(candle) {
this.batcher.write([candle]);
}

// propogate a custom sized candle to the trading method
Actor.prototype.processCustomCandle = function(candle) {
this.method.tick(candle);
}

Expand Down

0 comments on commit cf3c565

Please sign in to comment.