Skip to content

Commit

Permalink
THRIFT-2976: add browserify support and tests
Browse files Browse the repository at this point in the history
Client: nodejs
Patch: Andrew de Andrade
  • Loading branch information
RandyAbernethy committed Feb 16, 2015
1 parent 3b9ff4d commit d8187c5
Show file tree
Hide file tree
Showing 16 changed files with 836 additions and 248 deletions.
34 changes: 34 additions & 0 deletions lib/nodejs/lib/thrift/browser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
exports.Thrift = require('./thrift');

var xhrConnection = require('./xhr_connection');
exports.XHRConnection = xhrConnection.XHRConnection;
exports.createXHRConnection = xhrConnection.createXHRConnection;
exports.createXHRClient = xhrConnection.createXHRClient;

exports.Multiplexer = require('./multiplexed_protocol').Multiplexer;

exports.TWebSocketTransport = require('./ws_transport');
exports.TBufferedTransport = require('./buffered_transport');
exports.TFramedTransport = require('./framed_transport');

exports.Protocol = exports.TJSONProtocol = require('./json_protocol');
exports.TBinaryProtocol = require('./binary_protocol');
exports.TCompactProtocol = require('./compact_protocol');
5 changes: 5 additions & 0 deletions lib/nodejs/lib/thrift/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ exports.WSConnection = wsConnection.WSConnection;
exports.createWSConnection = wsConnection.createWSConnection;
exports.createWSClient = wsConnection.createWSClient;

var xhrConnection = require('./xhr_connection');
exports.XHRConnection = xhrConnection.XHRConnection;
exports.createXHRConnection = xhrConnection.createXHRConnection;
exports.createXHRClient = xhrConnection.createXHRClient;

var server = require('./server');
exports.createServer = server.createServer;
exports.createMultiplexServer = server.createMultiplexServer;
Expand Down
15 changes: 4 additions & 11 deletions lib/nodejs/lib/thrift/json_protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ module.exports = TJSONProtocol;
* var protocol = new Thrift.Protocol(transport);
*/
function TJSONProtocol(trans) {
this.tstack = [];
this.tpos = [];
this.trans = trans;
};

Expand Down Expand Up @@ -607,15 +609,7 @@ TJSONProtocol.prototype.readSetEnd = function() {
* False unless the next number in the protocol buffer
* is 1, in which case the value property is True */
TJSONProtocol.prototype.readBool = function() {
var r = this.readI32();

if (r !== null && r.value == '1') {
r.value = true;
} else {
r.value = false;
}

return r;
return this.readI32() == '1';
};

/** Returns the an object with a value property set to the
Expand Down Expand Up @@ -685,8 +679,7 @@ TJSONProtocol.prototype.readBinary = function() {
/** Returns the an object with a value property set to the
next value found in the protocol buffer */
TJSONProtocol.prototype.readString = function() {
var r = this.readI32();
return r;
return this.readI32();
};

/**
Expand Down
1 change: 0 additions & 1 deletion lib/nodejs/lib/thrift/multiplexed_protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
var util = require('util');
var Thrift = require('./thrift');

exports.Wrapper = Wrapper;
exports.Multiplexer = Multiplexer;

function Wrapper(serviceName, protocol, connection) {
Expand Down
2 changes: 2 additions & 0 deletions lib/nodejs/lib/thrift/web_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ exports.createWebServer = function(options) {
"Upgrade: websocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Accept: " + hash.digest("base64") + "\r\n" +
"Sec-WebSocket-Origin: " + request.headers.origin + "\r\n" +
"Sec-WebSocket-Location: ws://" + request.headers.host + request.url + "\r\n" +
"\r\n");
//Handle WebSocket traffic
var data = null;
Expand Down
4 changes: 3 additions & 1 deletion lib/nodejs/lib/thrift/ws_connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ var InputBufferUnderrunError = require('./input_buffer_underrun_error');

var createClient = require('./create_client');

exports.WSConnection = WSConnection;

/**
* @class
* @name WSConnectOptions
Expand Down Expand Up @@ -73,7 +75,7 @@ var createClient = require('./create_client');
* semantics implemented using Websockets.
* @see {@link createWSConnection}
*/
var WSConnection = exports.WSConnection = function(host, port, options) {
function WSConnection(host, port, options) {
//Initialize the emitter base object
EventEmitter.call(this);

Expand Down
204 changes: 204 additions & 0 deletions lib/nodejs/lib/thrift/ws_transport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

module.exports = TWebSocketTransport;

/**
* Constructor Function for the WebSocket transport.
* @constructor
* @param {string} [url] - The URL to connect to.
* @classdesc The Apache Thrift Transport layer performs byte level I/O
* between RPC clients and servers. The JavaScript TWebSocketTransport object
* uses the WebSocket protocol. Target servers must implement WebSocket.
* (see: node.js example server_http.js).
* @example
* var transport = new Thrift.TWebSocketTransport("http://localhost:8585");
*/
function TWebSocketTransport(url) {
this.__reset(url);
};


TWebSocketTransport.prototype.__reset = function(url) {
this.url = url; //Where to connect
this.socket = null; //The web socket
this.callbacks = []; //Pending callbacks
this.send_pending = []; //Buffers/Callback pairs waiting to be sent
this.send_buf = ''; //Outbound data, immutable until sent
this.recv_buf = ''; //Inbound data
this.rb_wpos = 0; //Network write position in receive buffer
this.rb_rpos = 0; //Client read position in receive buffer
};

/**
* Sends the current WS request and registers callback. The async
* parameter is ignored (WS flush is always async) and the callback
* function parameter is required.
* @param {object} async - Ignored.
* @param {object} callback - The client completion callback.
* @returns {undefined|string} Nothing (undefined)
*/
TWebSocketTransport.prototype.flush = function(async, callback) {
var self = this;
if (this.isOpen()) {
//Send data and register a callback to invoke the client callback
this.socket.send(this.send_buf);
this.callbacks.push((function() {
var clientCallback = callback;
return function(msg) {
self.setRecvBuffer(msg);
clientCallback();
};
}()));
} else {
//Queue the send to go out __onOpen
this.send_pending.push({
buf: this.send_buf,
cb: callback
});
}
};

TWebSocketTransport.prototype.__onOpen = function() {
var self = this;
if (this.send_pending.length > 0) {
//If the user made calls before the connection was fully
//open, send them now
this.send_pending.forEach(function(elem) {
this.socket.send(elem.buf);
this.callbacks.push((function() {
var clientCallback = elem.cb;
return function(msg) {
self.setRecvBuffer(msg);
clientCallback();
};
}()));
});
this.send_pending = [];
}
};

TWebSocketTransport.prototype.__onClose = function(evt) {
this.__reset(this.url);
};

TWebSocketTransport.prototype.__onMessage = function(evt) {
if (this.callbacks.length) {
this.callbacks.shift()(evt.data);
}
};

TWebSocketTransport.prototype.__onError = function(evt) {
console.log("Thrift WebSocket Error: " + evt.toString());
this.socket.close();
};

/**
* Sets the buffer to use when receiving server responses.
* @param {string} buf - The buffer to receive server responses.
*/
TWebSocketTransport.prototype.setRecvBuffer = function(buf) {
this.recv_buf = buf;
this.recv_buf_sz = this.recv_buf.length;
this.wpos = this.recv_buf.length;
this.rpos = 0;
};

/**
* Returns true if the transport is open
* @readonly
* @returns {boolean}
*/
TWebSocketTransport.prototype.isOpen = function() {
return this.socket && this.socket.readyState == this.socket.OPEN;
};

/**
* Opens the transport connection
*/
TWebSocketTransport.prototype.open = function() {
//If OPEN/CONNECTING/CLOSING ignore additional opens
if (this.socket && this.socket.readyState != this.socket.CLOSED) {
return;
}
//If there is no socket or the socket is closed:
this.socket = new WebSocket(this.url);
this.socket.onopen = this.__onOpen.bind(this);
this.socket.onmessage = this.__onMessage.bind(this);
this.socket.onerror = this.__onError.bind(this);
this.socket.onclose = this.__onClose.bind(this);
};

/**
* Closes the transport connection
*/
TWebSocketTransport.prototype.close = function() {
this.socket.close();
};

/**
* Returns the specified number of characters from the response
* buffer.
* @param {number} len - The number of characters to return.
* @returns {string} Characters sent by the server.
*/
TWebSocketTransport.prototype.read = function(len) {
var avail = this.wpos - this.rpos;

if (avail === 0) {
return '';
}

var give = len;

if (avail < len) {
give = avail;
}

var ret = this.read_buf.substr(this.rpos, give);
this.rpos += give;

//clear buf when complete?
return ret;
};

/**
* Returns the entire response buffer.
* @returns {string} Characters sent by the server.
*/
TWebSocketTransport.prototype.readAll = function() {
return this.recv_buf;
};

/**
* Sets the send buffer to buf.
* @param {string} buf - The buffer to send.
*/
TWebSocketTransport.prototype.write = function(buf) {
this.send_buf = buf;
};

/**
* Returns the send buffer.
* @readonly
* @returns {string} The send buffer.
*/
TWebSocketTransport.prototype.getSendBuffer = function() {
return this.send_buf;
};
Loading

0 comments on commit d8187c5

Please sign in to comment.