THRIFT-2819
Client Node
Patch: Chi Vinh Le
Adds websocket client to Node with tests
diff --git a/lib/nodejs/lib/thrift/index.js b/lib/nodejs/lib/thrift/index.js
index ea7fde0..9b53dd0 100644
--- a/lib/nodejs/lib/thrift/index.js
+++ b/lib/nodejs/lib/thrift/index.js
@@ -31,6 +31,11 @@
exports.createHttpConnection = httpConnection.createHttpConnection;
exports.createHttpClient = httpConnection.createHttpClient;
+var wsConnection = require('./ws_connection');
+exports.WSConnection = wsConnection.WSConnection;
+exports.createWSConnection = wsConnection.createWSConnection;
+exports.createWSClient = wsConnection.createWSClient;
+
var server = require('./server');
exports.createServer = server.createServer;
exports.createMultiplexServer = server.createMultiplexServer;
diff --git a/lib/nodejs/lib/thrift/web_server.js b/lib/nodejs/lib/thrift/web_server.js
index 40fc1ae..926b72c 100644
--- a/lib/nodejs/lib/thrift/web_server.js
+++ b/lib/nodejs/lib/thrift/web_server.js
@@ -27,7 +27,7 @@
var TTransport = require('./transport');
var TBufferedTransport = require('./transport').TBufferedTransport;
var TBinaryProtocol = require('./protocol').TBinaryProtocol;
-
+var TJSONProtocol = require('./protocol').TJSONProtocol;
// WSFrame constructor and prototype
/////////////////////////////////////////////////////////////////////
@@ -452,11 +452,12 @@
///////////////////////////////////////////////////
function processWS(data, socket, svc, binEncoding) {
svc.transport.receiver(function(transportWithData) {
+ var binary = svc.protocol != TJSONProtocol;
var input = new svc.protocol(transportWithData);
var output = new svc.protocol(new svc.transport(undefined, function(buf) {
try {
var frame = wsFrame.encode(buf, null, binEncoding);
- socket.write(frame);
+ socket.write(frame, null, binary);
} catch (err) {
//TODO: Add better error processing
}
@@ -519,10 +520,11 @@
"\r\n");
//Handle WebSocket traffic
var data = null;
+ var binary = svc.protocol != TJSONProtocol;
socket.on('data', function(frame) {
try {
while (frame) {
- var result = wsFrame.decode(frame);
+ var result = wsFrame.decode(frame, null, binary);
//Prepend any existing decoded data
if (data) {
if (result.data) {
diff --git a/lib/nodejs/lib/thrift/ws_connection.js b/lib/nodejs/lib/thrift/ws_connection.js
new file mode 100644
index 0000000..54dd936
--- /dev/null
+++ b/lib/nodejs/lib/thrift/ws_connection.js
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+var util = require('util');
+var WebSocket = require('ws');
+var EventEmitter = require("events").EventEmitter;
+var thrift = require('./thrift');
+var ttransport = require('./transport');
+var tprotocol = require('./protocol');
+
+/**
+ * @class
+ * @name WSConnectOptions
+ * @property {string} transport - The Thrift layered transport to use (TBufferedTransport, etc).
+ * @property {string} protocol - The Thrift serialization protocol to use (TJSONProtocol, etc.).
+ * @property {string} path - The URL path to connect to (e.g. "/", "/mySvc", "/thrift/quoteSvc", etc.).
+ * @property {object} headers - A standard Node.js header hash, an object hash containing key/value
+ * pairs where the key is the header name string and the value is the header value string.
+ * @property {boolean} secure - True causes the connection to use wss, otherwise ws is used.
+ * @property {object} wsOptions - Options passed on to WebSocket.
+ * @example
+ * //Use a secured websocket connection
+ * // uses the buffered transport layer, uses the JSON protocol and directs RPC traffic
+ * // to wss://thrift.example.com:9090/hello
+ * var thrift = require('thrift');
+ * var options = {
+ * transport: thrift.TBufferedTransport,
+ * protocol: thrift.TJSONProtocol,
+ * path: "/hello",
+ * secure: true
+ * };
+ * var con = thrift.createWSConnection("thrift.example.com", 9090, options);
+ * con.open()
+ * var client = thrift.createWSClient(myService, connection);
+ * client.myServiceFunction();
+ * con.close()
+ */
+
+/**
+ * Initializes a Thrift WSConnection instance (use createWSConnection() rather than
+ * instantiating directly).
+ * @constructor
+ * @param {string} host - The host name or IP to connect to.
+ * @param {number} port - The TCP port to connect to.
+ * @param {WSConnectOptions} options - The configuration options to use.
+ * @throws {error} Exceptions other than ttransport.InputBufferUnderrunError are rethrown
+ * @event {error} The "error" event is fired when a Node.js error event occurs during
+ * request or response processing, in which case the node error is passed on. An "error"
+ * event may also be fired when the connectison can not map a response back to the
+ * appropriate client (an internal error), generating a TApplicationException.
+ * @classdesc WSConnection objects provide Thrift end point transport
+ * semantics implemented using Websockets.
+ * @see {@link createWSConnection}
+ */
+var WSConnection = exports.WSConnection = function(host, port, options) {
+ //Initialize the emitter base object
+ EventEmitter.call(this);
+
+ //Set configuration
+ var self = this;
+ this.options = options || {};
+ this.host = host;
+ this.port = port;
+ this.secure = this.options.secure || false;
+ this.transport = this.options.transport || ttransport.TBufferedTransport;
+ this.protocol = this.options.protocol || tprotocol.TJSONProtocol;
+ this.path = this.options.path;
+ this.send_pending = [];
+
+ //The sequence map is used to map seqIDs back to the
+ // calling client in multiplexed scenarios
+ this.seqId2Service = {};
+
+ //Prepare WebSocket options
+ this.wsOptions = {
+ host: this.host,
+ port: this.port || 80,
+ path: this.options.path || '/',
+ headers: this.options.headers || {}
+ };
+ for (var attrname in this.options.wsOptions) {
+ this.wsOptions[attrname] = this.options.wsOptions[attrname];
+ }
+};
+util.inherits(WSConnection, EventEmitter);
+
+WSConnection.prototype.__reset = function() {
+ this.socket = null; //The web socket
+ this.send_pending = []; //Buffers/Callback pairs waiting to be sent
+};
+
+WSConnection.prototype.__onOpen = function() {
+ var self = this;
+ this.emit("open");
+ if (this.send_pending.length > 0) {
+ //If the user made calls before the connection was fully
+ //open, send them now
+ this.send_pending.forEach(function(data) {
+ self.socket.send(data);
+ });
+ this.send_pending = [];
+ }
+};
+
+WSConnection.prototype.__onClose = function(evt) {
+ this.emit("close");
+ this.__reset();
+};
+
+WSConnection.prototype.__decodeCallback = function(transport_with_data) {
+ var proto = new this.protocol(transport_with_data);
+ try {
+ while (true) {
+ var header = proto.readMessageBegin();
+ var dummy_seqid = header.rseqid * -1;
+ var client = this.client;
+ //The Multiplexed Protocol stores a hash of seqid to service names
+ // in seqId2Service. If the SeqId is found in the hash we need to
+ // lookup the appropriate client for this call.
+ // The client var is a single client object when not multiplexing,
+ // when using multiplexing it is a service name keyed hash of client
+ // objects.
+ //NOTE: The 2 way interdependencies between protocols, transports,
+ // connections and clients in the Node.js implementation are irregular
+ // and make the implementation difficult to extend and maintain. We
+ // should bring this stuff inline with typical thrift I/O stack
+ // operation soon.
+ // --ra
+ var service_name = this.seqId2Service[header.rseqid];
+ if (service_name) {
+ client = this.client[service_name];
+ delete this.seqId2Service[header.rseqid];
+ }
+ /*jshint -W083 */
+ client._reqs[dummy_seqid] = function(err, success) {
+ transport_with_data.commitPosition();
+ var clientCallback = client._reqs[header.rseqid];
+ delete client._reqs[header.rseqid];
+ if (clientCallback) {
+ clientCallback(err, success);
+ }
+ };
+ /*jshint +W083 */
+ if (client['recv_' + header.fname]) {
+ client['recv_' + header.fname](proto, header.mtype, dummy_seqid);
+ } else {
+ delete client._reqs[dummy_seqid];
+ this.emit("error",
+ new thrift.TApplicationException(
+ thrift.TApplicationExceptionType.WRONG_METHOD_NAME,
+ "Received a response to an unknown RPC function"));
+ }
+ }
+ } catch (e) {
+ if (e instanceof ttransport.InputBufferUnderrunError) {
+ transport_with_data.rollbackPosition();
+ } else {
+ throw e;
+ }
+ }
+};
+
+WSConnection.prototype.__onData = function(data) {
+ if (Object.prototype.toString.call(data) == "[object ArrayBuffer]") {
+ data = new Uint8Array(data);
+ }
+ var buf = new Buffer(data);
+ this.transport.receiver(this.__decodeCallback.bind(this))(buf);
+
+};
+WSConnection.prototype.__onMessage = function(evt) {
+
+ this.__onData(evt.data);
+};
+
+WSConnection.prototype.__onError = function(evt) {
+ this.emit("error", evt);
+ this.socket.close();
+};
+
+/**
+ * Returns true if the transport is open
+ * @readonly
+ * @returns {boolean}
+ */
+WSConnection.prototype.isOpen = function() {
+ return this.socket && this.socket.readyState == this.socket.OPEN;
+};
+
+/**
+ * Opens the transport connection
+ */
+WSConnection.prototype.open = function() {
+ //If OPEN/CONNECTING/CLOSING ignore additional opens
+ if (this.socket && this.socket.readyState != this.socket.CLOSED) {
+ return;
+ }
+ //If there is no socket or the socket is closed:
+ this.socket = new WebSocket(this.uri(), "", this.wsOptions);
+ this.socket.binaryType = 'arraybuffer';
+ this.socket.onopen = this.__onOpen.bind(this);
+ this.socket.onmessage = this.__onMessage.bind(this);
+ this.socket.onerror = this.__onError.bind(this);
+ this.socket.onclose = this.__onClose.bind(this);
+};
+
+/**
+ * Closes the transport connection
+ */
+WSConnection.prototype.close = function() {
+ this.socket.close();
+};
+
+/**
+ * Return URI for the connection
+ * @returns {string} URI
+ */
+
+WSConnection.prototype.uri = function() {
+ var schema = this.secure ? 'wss' : 'ws';
+ var port = '';
+ var path = this.path || '/';
+ var host = this.host;
+
+ // avoid port if default for schema
+ if (this.port && (('wss' == schema && this.port != 443) ||
+ ('ws' == schema && this.port != 80))) {
+ port = ':' + this.port;
+ }
+
+ return schema + '://' + host + port + path;
+};
+
+/**
+ * Writes Thrift message data to the connection
+ * @param {Buffer} data - A Node.js Buffer containing the data to write
+ * @returns {void} No return value.
+ * @event {error} the "error" event is raised upon request failure passing the
+ * Node.js error object to the listener.
+ */
+WSConnection.prototype.write = function(data) {
+ if (this.isOpen()) {
+ //Send data and register a callback to invoke the client callback
+ this.socket.send(data);
+ } else {
+ //Queue the send to go out __onOpen
+ this.send_pending.push(data);
+ }
+};
+
+/**
+ * Creates a new WSConnection object, used by Thrift clients to connect
+ * to Thrift HTTP based servers.
+ * @param {string} host - The host name or IP to connect to.
+ * @param {number} port - The TCP port to connect to.
+ * @param {WSConnectOptions} options - The configuration options to use.
+ * @returns {WSConnection} The connection object.
+ * @see {@link WSConnectOptions}
+ */
+exports.createWSConnection = function(host, port, options) {
+ return new WSConnection(host, port, options);
+};
+
+/**
+ * Creates a new client object for the specified Thrift service.
+ * @param {object} cls - The module containing the service client
+ * @param {WSConnection} wsConnection - The connection to use.
+ * @returns {object} The client object.
+ * @see {@link createWSConnection}
+ */
+exports.createWSClient = function(cls, wsConnection) {
+ if (cls.Client) {
+ cls = cls.Client;
+ }
+ wsConnection.client =
+ new cls(new wsConnection.transport(undefined, function(buf) {
+ wsConnection.write(buf);
+ }),
+ wsConnection.protocol);
+ return wsConnection.client;
+};
diff --git a/lib/nodejs/package.json b/lib/nodejs/package.json
index d7e39e4..51216d2 100755
--- a/lib/nodejs/package.json
+++ b/lib/nodejs/package.json
@@ -2,38 +2,44 @@
"name": "thrift",
"description": "node.js bindings for the Apache Thrift RPC system",
"homepage": "http://thrift.apache.org/",
- "repository":
- { "type" : "git",
- "url" : "https://git-wip-us.apache.org/repos/asf/thrift.git"
- },
+ "repository": {
+ "type": "git",
+ "url": "https://git-wip-us.apache.org/repos/asf/thrift.git"
+ },
"version": "1.0.0-dev",
- "author":
- { "name": "Apache Thrift Developers",
- "email": "dev@thrift.apache.org",
- "url": "http://thrift.apache.org"
- },
- "licenses":
- [ { "type": "Apache-2.0",
- "url": "http://www.apache.org/licenses/LICENSE-2.0"
- }
- ],
- "bugs":
- { "mail": "dev@thrift.apache.org",
- "url": "https://issues.apache.org/jira/browse/THRIFT"
- },
- "directories" : { "lib" : "./lib/thrift" },
+ "author": {
+ "name": "Apache Thrift Developers",
+ "email": "dev@thrift.apache.org",
+ "url": "http://thrift.apache.org"
+ },
+ "licenses": [
+ {
+ "type": "Apache-2.0",
+ "url": "http://www.apache.org/licenses/LICENSE-2.0"
+ }
+ ],
+ "bugs": {
+ "mail": "dev@thrift.apache.org",
+ "url": "https://issues.apache.org/jira/browse/THRIFT"
+ },
+ "directories": {
+ "lib": "./lib/thrift"
+ },
"main": "./lib/thrift",
- "engines": { "node": ">= 0.2.4" },
+ "engines": {
+ "node": ">= 0.2.4"
+ },
"dependencies": {
"node-int64": "~0.3.0",
- "q": "1.0.x",
- "nodeunit": "~0.8.0"
+ "q": "1.0.x",
+ "nodeunit": "~0.8.0",
+ "ws": "~0.4.32"
},
"devDependencies": {
"connect": "2.7.x",
"commander": "2.1.x"
},
"scripts": {
- "test" : "test/testAll.sh"
+ "test": "test/testAll.sh"
}
}
diff --git a/lib/nodejs/test/testAll.sh b/lib/nodejs/test/testAll.sh
index e09c783..4008eec 100755
--- a/lib/nodejs/test/testAll.sh
+++ b/lib/nodejs/test/testAll.sh
@@ -59,6 +59,18 @@
return $RET
}
+testWSClientServer()
+{
+ echo " Testing WebSocket Client/Server with protocol $1 and transport $2 $3";
+ RET=0
+ node ${DIR}/http_server.js -p $1 -t $2 $3 &
+ SERVERPID=$!
+ sleep 1
+ node ${DIR}/ws_client.js -p $1 -t $2 $3 || RET=1
+ kill -9 $SERVERPID || RET=1
+ return $RET
+}
+
TESTOK=0
@@ -104,4 +116,14 @@
testHttpClientServer json buffered --promise || TESTOK=1
testHttpClientServer binary framed --ssl || TESTOK=1
+#WebSocket tests
+testWSClientServer compact buffered || TESTOK=1
+testWSClientServer compact framed || TESTOK=1
+testWSClientServer json buffered || TESTOK=1
+testWSClientServer json framed || TESTOK=1
+testWSClientServer binary buffered || TESTOK=1
+testWSClientServer binary framed || TESTOK=1
+testWSClientServer json buffered --promise || TESTOK=1
+testWSClientServer binary framed --ssl || TESTOK=1
+
exit $TESTOK
diff --git a/lib/nodejs/test/ws_client.js b/lib/nodejs/test/ws_client.js
new file mode 100644
index 0000000..4573246
--- /dev/null
+++ b/lib/nodejs/test/ws_client.js
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//This is the client side test for the standard Apache Thrift
+//"ThriftTest" suite. This client will test any protocol/transport
+//combination specified on the command line.
+
+var fs = require('fs');
+var assert = require('assert');
+var thrift = require('thrift');
+var ThriftTest = require('./gen-nodejs/ThriftTest');
+var ThriftTestDriver = require('./thrift_test_driver').ThriftTestDriver;
+var ThriftTestDriverPromise = require('./thrift_test_driver_promise').ThriftTestDriver;
+
+var program = require('commander');
+
+program
+ .option('-p, --protocol <protocol>', 'Set thrift protocol (binary|json) [protocol]')
+ .option('-t, --transport <transport>', 'Set thrift transport (buffered|framed) [transport]')
+ .option('--ssl', 'use wss instead of ws')
+ .option('--promise', 'test with promise style functions')
+ .parse(process.argv);
+
+var protocol = thrift.TBinaryProtocol;
+if (program.protocol === "json") {
+ protocol = thrift.TJSONProtocol;
+}
+
+var transport = thrift.TBufferedTransport;
+if (program.transport === "framed") {
+ transport = thrift.TFramedTransport;
+}
+
+var options = {
+ transport: transport,
+ protocol: protocol,
+ path: "/test"
+};
+
+if (program.ssl) {
+ options.wsOptions = { rejectUnauthorized: false };
+ options.secure = true;
+}
+
+var connection = thrift.createWSConnection("localhost", 9090, options);
+connection.open();
+
+var client = thrift.createWSClient(ThriftTest, connection);
+
+connection.on('error', function(err) {
+ assert(false, err);
+});
+
+var testDriver = ThriftTestDriver;
+if (program.promise) {
+ console.log(" --Testing promise style client");
+ testDriver = ThriftTestDriverPromise;
+}
+testDriver(client, function (status) {
+ console.log(status);
+ process.exit(0);
+});
+
+// to make it also run on expresso
+exports.expressoTest = function() {};