Skip to content

Commit

Permalink
fix multiple receiver with TCPROS
Browse files Browse the repository at this point in the history
  • Loading branch information
maxired committed Apr 29, 2012
1 parent 6839c1d commit dd2e9de
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 96 deletions.
68 changes: 35 additions & 33 deletions lib/tcpros.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,18 @@ TCPROS.prototype.createPublisher = function() {
;

var server = net.createServer(function(socket) {
// that.sockets.push(socket);
// var removeSocketFromSockets = function(socket) {
// var index = that.sockets.indexOf(socket);
// if (index>-1) {
// that.sockets.splice(index, 1);
// }
// }
that.socket = socket;
that.sockets.push(socket);
var removeSocketFromSockets = function(socket) {
var index = that.sockets.indexOf(socket);
if (index>-1) {
that.sockets.splice(index, 1);
}
}
socket.on('end', function() {
// removeSocketFromSockets(socket);
removeSocketFromSockets(socket);
});
socket.on('error', function() {
// removeSocketFromSockets(socket);
removeSocketFromSockets(socket);
});

socket.on('data', function(data) {
Expand Down Expand Up @@ -95,23 +94,26 @@ TCPROS.prototype.createSubscriber = function(port, host, subscriber) {
};

TCPROS.prototype.publish = function(message) {
var publish = function() {
console.log("Publishc called fpor mesage : " , message);
var that=this;
var publish = function(message) {
var messageBuffer = serializeMessage(message);
this.socket.write(messageBuffer);
};

if (!this.socket) {
this.once('connect', publish);
if (!this.sockets.length) {
this.once('connect', function(){
//that.publish(message);
});
}
else {
publish();
var self=this;
async.forEach( this.sockets, function(socket, callback){
var messageBuffer = serializeMessage(message);
socket.write(messageBuffer);
callback();
});
}
// var self=this;
// async.forEach( sockets, function(socket, callback){
// var messageBuffer = serializeMessage(message);
// socket.write(messageBuffer);
// callback();
// }, callback);
};

function deserializeConnectionHeader(buffer) {
Expand All @@ -124,12 +126,12 @@ function deserializeConnectionHeader(buffer) {
var fieldLength = buffer.readUInt32LE(bufferOffset);
bufferOffset += 4;
var fieldStart = bufferOffset
, fieldEnd = fieldStart + fieldLength
, field = buffer.toString('utf8', fieldStart, fieldEnd)
, fieldComponents = field.split('=')
, fieldName = fieldComponents[0]
, fieldValue = fieldComponents[1]
;
, fieldEnd = fieldStart + fieldLength
, field = buffer.toString('utf8', fieldStart, fieldEnd)
, fieldComponents = field.split('=')
, fieldName = fieldComponents[0]
, fieldValue = fieldComponents[1]
;
bufferOffset += fieldLength;

if (fieldName === 'callerid') {
Expand Down Expand Up @@ -245,10 +247,10 @@ function serializeInnerMessage(message, buffer, bufferOffset) {

function deserializeMessage(buffer, messageType) {
var message = new messageType()
, headerLength = 0
, isConnectionHeader = false
, messageLength = buffer.readUInt32LE(0)
, bufferOffset = 4
, headerLength = 0
, isConnectionHeader = false
, messageLength = buffer.readUInt32LE(0)
, bufferOffset = 4

// If the initial length is less than the buffer, then a connection header was
// sent first as part of this buffer.
Expand Down Expand Up @@ -284,9 +286,9 @@ function deserializeInnerMessage(message, buffer, bufferOffset) {
}
else if (fieldsUtil.isArray(field.type)) {
var array = []
, arraySize = buffer.readUInt32LE(bufferOffset)
, arrayType = fieldsUtil.getTypeOfArray(field.type)
;
, arraySize = buffer.readUInt32LE(bufferOffset)
, arrayType = fieldsUtil.getTypeOfArray(field.type)
;
bufferOffset += 4;

for (var i = 0; i < arraySize; i++) {
Expand Down
133 changes: 76 additions & 57 deletions lib/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ function Topic(options) {
this.node = options.node;
this.topic = options.topic;
this.messageType = options.messageType;
this.mode = options.mode || "all";
this.uri = null;
this.registeredPublisher = false;
this.registeredSubscriber = false;
this.protocols = [];
this.protocols = {};

this.createSlaveServer();
}
Expand Down Expand Up @@ -53,68 +54,104 @@ Topic.prototype.createSlaveServer = function() {

that.emit('connection', uri);
});

if(this.mode=="all"||this.mode=="publish"){
this._registerPublisher();
}
};

Topic.prototype._registerPublisher=function(){
var that=this;

this.getUri(function(uri) {
var masterParams = {
callerId : that.node
, callerUri : uri
, topic : that.topic
, messageType : that.messageType.messageType
};
master.registerPublisher(masterParams, function(error) {
if (error) {
that.emit('error', error);
}
});
});
};

Topic.prototype.requestTopic = function(error, params, callback) {
console.log("topic requested");
var that = this
, callerId = params[0]
, topic = params[1]
, protocols = params[2]
;
, callerId = params[0]
, topic = params[1]
, protocols = params[2]
, protocol;

if (topic.length > 0 && topic.charAt(0) === '/') {
topic = topic.substr(1, topic.length - 1);
}

this.protocol = new TCPROS({
node : this.node
, topic : this.topic
, messageType : this.messageType
});
this.protocol.on('listening', function(uri) {
that.emit('publisher_ready', that.protocol);

var protocolListening=function(uri){
console.log("Uri ", uri);
var statusCode = 1
, statusMessage = 'ready on ' + uri
, uriFields = url.parse(uri)
, hostname = uriFields.hostname
, port = parseInt(uriFields.port)
, protocolParams = ['TCPROS', hostname, port]
;
, statusMessage = 'ready on ' + uri
, uriFields = url.parse(uri)
, hostname = uriFields.hostname
, port = parseInt(uriFields.port)
, protocolParams = ['TCPROS', hostname, port]
;
callback(null, [statusCode, statusMessage, protocolParams]);
});
this.protocol.createPublisher();
};



if(!this.protocols['TCPROS']){
protocol = new TCPROS({
node : this.node
, topic : this.topic
, messageType : this.messageType
});
protocol.on('listening', function(uri) {
that.protocols['TCPROS']={protocol:protocol, uri:uri};
that.emit('publisher_ready', protocol);
protocolListening(uri);
});
protocol.createPublisher();
}else{
protocolListening(that.protocols['TCPROS'].uri);
}

};

Topic.prototype.publisherUpdate = function(error, params, callback) {
var that = this
, callerId = params[0]
, topic = params[1]
, publishers = params[2]
;
, callerId = params[0]
, topic = params[1]
, publishers = params[2]
;

if (topic.length > 0 && topic.charAt(0) === '/') {
topic = topic.substr(1, topic.length - 1);
}

publishers.forEach(function(publisherUri) {
var client = xmlrpc.createClient(publisherUri)
, protocols = [['TCPROS']]
, params = [that.node, that.topic, protocols]
;
, protocols = [['TCPROS']]
, params = [that.node, that.topic, protocols]
;

client.methodCall('requestTopic', params, function(error, value) {
var hostParams = value[2]
, protocol = hostParams[0]
, host = hostParams[1]
, port = hostParams[2]
;
, protocol = hostParams[0]
, host = hostParams[1]
, port = hostParams[2]
;

if (protocol === 'TCPROS') {
that.protocol = new TCPROS({
node : that.node
, topic : that.topic
, messageType : that.messageType
, topic : that.topic
, messageType : that.messageType
});

that.protocol.on('message', function(message) {
Expand All @@ -129,30 +166,12 @@ Topic.prototype.publisherUpdate = function(error, params, callback) {

Topic.prototype.publish = function(message) {
var that = this;

if (this.protocol) {
this.protocol.publish();
console.log("publish on topic");
for( i in this.protocols){
console.log("looping for each");
this.protocols[i].protocol.publish(message);
}
else {
this.on('publisher_ready', function(protocol) {
protocol.publish(message);
});

this.getUri(function(uri) {
var masterParams = {
callerId : that.node
, callerUri : uri
, topic : that.topic
, messageType : that.messageType.messageType
};
master.registerPublisher(masterParams, function(error) {
if (error) {
that.emit('error', error);
}
});
});
}
};
}

Topic.prototype.subscribe = function(callback) {
var that = this;
Expand All @@ -162,7 +181,7 @@ Topic.prototype.subscribe = function(callback) {
if (!this.protocol) {
this.getUri(function(uri) {
var masterParams = {
callerId : that.node
callerId : that.node
, callerUri : uri
, topic : that.topic
, messageType : that.messageType.messageType
Expand Down
13 changes: 7 additions & 6 deletions sendingTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ var exec = require('child_process').exec
ros.types([
'std_msgs/String'
], function(String) {
var node = new ros.node('talker');
node.topics([
{ topic: 'hello_world', messageType: String }
], {mode:"publish"}, function pub(publishExample) {
var topic = new ros.topic(
{ topic: 'hello_world', messageType: String ,node:'talker'}
);

function pub(publishExample) {
var message = new String({ data: 'howdy' });
console.log("sending ",message);
publishExample.publish(message, function(){ console.log("emitted");});
};

setTimeout(function(){pub(publishExample);}, 2000);
setInterval(function(){pub(topic);}, 2000);

});
});

0 comments on commit dd2e9de

Please sign in to comment.