Skip to content

Commit

Permalink
Rewriting Sphinx connection and querying in non-blocking model
Browse files Browse the repository at this point in the history
  • Loading branch information
kurokikaze committed Jan 13, 2010
1 parent 52c0691 commit e1e36ad
Showing 1 changed file with 53 additions and 14 deletions.
67 changes: 53 additions & 14 deletions limestone.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ var Sphinx = {
};

(function() {
// var Sphinx.port = 9312;

Sphinx.queries = [];

// All search modes
Expand Down Expand Up @@ -89,14 +87,20 @@ var Sphinx = {
"MULTI": 1073741824 // 0x40000000
}

sys.puts('Connecting to searchd...');
var server_conn;
var connection_status;


var server_conn = tcp.createConnection(Sphinx.port);
// Connect to Sphinx server
Sphinx.connect = function(port, callback) {

server_conn = tcp.createConnection(Sphinx.port);
// disable Nagle algorithm
server_conn.setNoDelay(true);
server_conn.setEncoding('binary');

var promise = new process.Promise();

server_conn.addListener('connect', function () {
// Sending protocol version
sys.puts('Sending version number...');
Expand All @@ -114,9 +118,32 @@ var Sphinx = {
var protocol_version = (new bits.Decoder(data)).shift_int32();
var data_unpacked = {'': 1};

var composeQuery = function(query) {
// Header
if (data_unpacked[""] >= 1) {

// Remove listener after handshaking
for (listener in server_conn.listeners('receive')) {
server_conn.removeListener('receive', listener);
}

// Use callback
promise.emitSuccess();

} else {
promise.emitError('Wrong protocol version');
}

});
});
promise.addCallback(callback);
return promise;
}

sys.puts('Connecting to searchd...');



Sphinx.query = function(query, callback) {
// Header

var request = (new bits.Encoder(0, Sphinx.clientCommand.SEARCH)).push_int32(0).push_int32(20).push_int32(Sphinx.searchMode.ALL).push_int32(Sphinx.rankingMode.BM25).push_int32(Sphinx.sortMode.RELEVANCE);

Expand Down Expand Up @@ -168,10 +195,12 @@ var Sphinx = {
server_conn.send(request.toString(), 'binary');

sys.puts('Request sent: [' + request.toString().length + ']');
var x;
for (x = 0; x < request.toString().length; x++) {
sys.puts(x + ':' + request.toString().charCodeAt(x).toString(16));
}
//var x;
//for (x = 0; x < request.toString().length; x++) {
// sys.puts(x + ':' + request.toString().charCodeAt(x).toString(16));
//}

var promise = new process.Promise();

server_conn.addListener('receive', function(data) {
// Got response!
Expand All @@ -181,8 +210,15 @@ var Sphinx = {

var answer = parseSearchResponse(response);

sys.puts('Answer data:' + JSON.stringify(answer));
promise.emitSuccess();
});

if (callback) {
promise.addCallback(callback);
}

return promise;

};

var getResponse = function(data, search_command) {
Expand Down Expand Up @@ -298,12 +334,13 @@ var Sphinx = {
output.msecs = response.shift_int32();
output.words = response.shift_int32();

// @todo: implement
// @todo: implement words

return output;
}

sys.puts('Server data received: ' + protocol_version);
// Old code
/* sys.puts('Server data received: ' + protocol_version);
if (data_unpacked[""] >= 1) {

// Remove listener after handshaking
Expand All @@ -322,4 +359,6 @@ var Sphinx = {
}
});
});
})();
})();

process.mixin(exports, Sphinx);

0 comments on commit e1e36ad

Please sign in to comment.