Skip to content

Commit

Permalink
Regex matching MQTT server, crashes occasionally
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam Rudd committed Mar 15, 2011
1 parent c659f3c commit 0a77a5f
Showing 1 changed file with 34 additions and 11 deletions.
45 changes: 34 additions & 11 deletions mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var EventEmitter = require("events").EventEmitter;
* (but the lifeboats make the deck look crowded!)
* 8. Standardise packet construction i.e. make functions that
* return packets rather than generating them in client.sendPacket
* 9. Catch error events from sockets and process them rather than
* letting them bubble up to the reactor
*/

MQTTPacketType = {'Connect':1, 'Connack':2,
Expand Down Expand Up @@ -452,31 +454,45 @@ s.on('new_client', function(client) {
});

client.on('publish', function(packet) {
for(var i = 0; i < list.length; i++) {
/* Don't publish to the publisher */
if(list[i] !== client) {
/* For each client, get a list of sub topics, sans QoS */
var subs = list[i].subscriptions.reduce(function(a,b) {
return a.concat(b.topic);
}, []);
/* If the packet's topic is in the list of sub topics, publish the message to the client */
/* Arghblblarhg */
if(subs.indexOf(packet.topic) != -1) {
list[i].publish(packet.topic, packet.payload);
/* Filter the publisher */
/* This is kind of dumb. There has to be a better way */
var fList = list.filter(function(x) {
if(x === client) {
return false;
} else {
return true;
}
});
for(var i = 0; i < fList.length; i++) {
var curClient = fList[i];
/* For each of our subscription regexes */
for(var j = 0; j < curClient.subscriptions.length; j++) {
/* If the regex matches the published topic */
var sub = curClient.subscriptions[j];
if(sub.test(packet.topic)) {
/* Publish the message */
curClient.publish(packet.topic, packet.payload);
}
}
}
});

client.on('subscribe', function(packet) {
sys.log(inspect(packet));
/*
if(client.subscriptions === undefined) {
client.subscriptions = packet.subscriptions;
} else {
client.subscriptions = client.subscriptions.concat(packet.subscriptions);
}
*/

for(var i = 0; i < packet.subscriptions.length; i++) {
client.subscriptions.push(new RegExp(packet.subscriptions[i].topic));
}

/* Give 'em whatever they want */
/* Hello flaw in the protocol! */
var qos = [];
for(var i = 0; i < packet.subscriptions; i++) {
qos.push(packet.subscriptions[i].qos);
Expand All @@ -491,5 +507,12 @@ s.on('new_client', function(client) {

client.on('disconnect', function() {
this.socket.end();
list = list.filter(function(x) {
if(x === client) {
return false;
} else {
return true;
}
});
});
});

0 comments on commit 0a77a5f

Please sign in to comment.