Skip to content

Commit

Permalink
Merge branch 'multipleClients' of https://github.com/maxired/rosnodejs
Browse files Browse the repository at this point in the history
Conflicts:
	lib/master.js
	lib/topic.js
  • Loading branch information
baalexander committed May 3, 2012
2 parents 7dee9b8 + af26a76 commit 221d667
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 160 deletions.
16 changes: 8 additions & 8 deletions lib/master.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ var xmlrpc = require('xmlrpc')
var master = exports

master.registerPublisher = function(publisher, callback) {
var masterUri = environment.getRosMasterUri()
, callerUri = publisher.callerUri
, callerId = getGraphResourceName(publisher.callerId)
, topicName = getGraphResourceName(publisher.topic)
, messageType = publisher.messageType
, params = [callerId, topicName, messageType, callerUri]
, client = xmlrpc.createClient(masterUri)
;
var masterUri = environment.getRosMasterUri()
, callerUri = publisher.callerUri
, callerId = getGraphResourceName(publisher.callerId)
, topicName = getGraphResourceName(publisher.topic)
, messageType = publisher.messageType
, params = [callerId, topicName, messageType, callerUri]
, client = xmlrpc.createClient(masterUri)
;

client.methodCall('registerPublisher', params, function(error, value) {
parseResponse(error, value, callback);
Expand Down
67 changes: 34 additions & 33 deletions lib/tcpros.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@ TCPROS.prototype.createPublisher = function() {
portscanner.findAPortNotInUse(9000, null, hostname, function(error, port) {

var server = net.createServer(function(socket) {
// that.sockets.push(socket);
// var removeSocketFromSockets = function(socket) {
// var index = that.sockets.indexOf(socket);
// if (index>-1) {
// that.sockets.splice(index, 1);
// }
// }
that.socket = socket;
that.sockets.push(socket);
var removeSocketFromSockets = function(socket) {
var index = that.sockets.indexOf(socket);
if (index>-1) {
that.sockets.splice(index, 1);
}
}
socket.on('end', function() {
// removeSocketFromSockets(socket);
removeSocketFromSockets(socket);
});
socket.on('error', function() {
// removeSocketFromSockets(socket);
removeSocketFromSockets(socket);
});

socket.on('data', function(data) {
Expand Down Expand Up @@ -100,23 +99,25 @@ TCPROS.prototype.createSubscriber = function(port, host, subscriber) {
};

TCPROS.prototype.publish = function(message) {
var publish = function() {
var that=this;
var publish = function(message) {
var messageBuffer = serializeMessage(message);
this.socket.write(messageBuffer);
};

if (!this.socket) {
this.once('connect', publish);
if (!this.sockets.length) {
this.once('connect', function(){
//that.publish(message);
});
}
else {
publish();
var self=this;
async.forEach( this.sockets, function(socket, callback){
var messageBuffer = serializeMessage(message);
socket.write(messageBuffer);
callback();
});
}
// var self=this;
// async.forEach( sockets, function(socket, callback){
// var messageBuffer = serializeMessage(message);
// socket.write(messageBuffer);
// callback();
// }, callback);
};

function deserializeConnectionHeader(buffer) {
Expand All @@ -129,12 +130,12 @@ function deserializeConnectionHeader(buffer) {
var fieldLength = buffer.readUInt32LE(bufferOffset);
bufferOffset += 4;
var fieldStart = bufferOffset
, fieldEnd = fieldStart + fieldLength
, field = buffer.toString('utf8', fieldStart, fieldEnd)
, fieldComponents = field.split('=')
, fieldName = fieldComponents[0]
, fieldValue = fieldComponents[1]
;
, fieldEnd = fieldStart + fieldLength
, field = buffer.toString('utf8', fieldStart, fieldEnd)
, fieldComponents = field.split('=')
, fieldName = fieldComponents[0]
, fieldValue = fieldComponents[1]
;
bufferOffset += fieldLength;

if (fieldName === 'callerid') {
Expand Down Expand Up @@ -250,10 +251,10 @@ function serializeInnerMessage(message, buffer, bufferOffset) {

function deserializeMessage(buffer, messageType) {
var message = new messageType()
, headerLength = 0
, isConnectionHeader = false
, messageLength = buffer.readUInt32LE(0)
, bufferOffset = 4
, headerLength = 0
, isConnectionHeader = false
, messageLength = buffer.readUInt32LE(0)
, bufferOffset = 4

// If the initial length is less than the buffer, then a connection header was
// sent first as part of this buffer.
Expand Down Expand Up @@ -289,9 +290,9 @@ function deserializeInnerMessage(message, buffer, bufferOffset) {
}
else if (fieldsUtil.isArray(field.type)) {
var array = []
, arraySize = buffer.readUInt32LE(bufferOffset)
, arrayType = fieldsUtil.getTypeOfArray(field.type)
;
, arraySize = buffer.readUInt32LE(bufferOffset)
, arrayType = fieldsUtil.getTypeOfArray(field.type)
;
bufferOffset += 4;

for (var i = 0; i < arraySize; i++) {
Expand Down
Loading

0 comments on commit 221d667

Please sign in to comment.