THRIFT-2493:Node.js lib needs HTTP client
Client: node
Patch: Randy Abernethy
Adds http client to node.js along with tests.
diff --git a/lib/nodejs/lib/thrift/http_connection.js b/lib/nodejs/lib/thrift/http_connection.js
new file mode 100644
index 0000000..7eab320
--- /dev/null
+++ b/lib/nodejs/lib/thrift/http_connection.js
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+var thrift = require('./thrift');
+var ttransport = require('./transport');
+var tprotocol = require('./protocol');
+
+var http = require('http');
+
+var HttpConnection = exports.HttpConnection = function(host, port, options) {
+ //Set configuration
+ var self = this;
+ this.options = options || {};
+ this.host = host;
+ this.port = port;
+ this.transport = this.options.transport || ttransport.TBufferedTransport;
+ this.protocol = this.options.protocol || tprotocol.TBinaryProtocol;
+
+ //Prepare Node.js options
+ this.nodeOptions = {
+ host: this.host,
+ port: this.port || 80,
+ path: this.options.path || '/',
+ method: 'POST',
+ headers: this.options.headers || {},
+ tls: options.tls || {},
+ };
+
+ //The sequence map is used to map seqIDs back to the
+ // calling client in multiplexed scenarios
+ this.seqId2Service = {};
+
+ function decodeCallback(transport_with_data) {
+ var proto = new self.protocol(transport_with_data);
+ try {
+ while (true) {
+ var header = proto.readMessageBegin();
+ var dummy_seqid = header.rseqid * -1;
+ var client = self.client;
+ //The Multiplexed Protocol stores a hash of seqid to service names
+ // in seqId2Service. If the SeqId is found in the hash we need to
+ // lookup the appropriate client for this call.
+ // The client var is a single client object when not multiplexing,
+ // when using multiplexing it is a service name keyed hash of client
+ // objects.
+ //NOTE: The 2 way interdependencies between protocols, transports,
+ // connections and clients in the Node.js implementation are irregular
+ // and make the implementation difficult to extend and maintain. We
+ // should bring this stuff inline with typical thrift I/O stack
+ // operation soon.
+ // --ra
+ var service_name = self.seqId2Service[header.rseqid];
+ if (service_name) {
+ client = self.client[service_name];
+ delete self.seqId2Service[header.rseqid];
+ }
+ /*jshint -W083 */
+ client._reqs[dummy_seqid] = function(err, success){
+ transport_with_data.commitPosition();
+ var clientCallback = client._reqs[header.rseqid];
+ delete client._reqs[header.rseqid];
+ if (clientCallback) {
+ clientCallback(err, success);
+ }
+ };
+ /*jshint +W083 */
+ if(client['recv_' + header.fname]) {
+ client['recv_' + header.fname](proto, header.mtype, dummy_seqid);
+ } else {
+ delete client._reqs[dummy_seqid];
+ throw new thrift.TApplicationException(thrift.TApplicationExceptionType.WRONG_METHOD_NAME,
+ "Received a response to an unknown RPC function");
+ }
+ }
+ }
+ catch (e) {
+ if (e instanceof ttransport.InputBufferUnderrunError) {
+ transport_with_data.rollbackPosition();
+ } else {
+ throw e;
+ }
+ }
+ };
+
+
+ //Response handler
+ //////////////////////////////////////////////////
+ this.responseCallback = function(response) {
+ var data = [];
+ var dataLen = 0;
+
+ response.on('error', function (err) {
+ console.log("Error in response: " + err);
+ });
+
+ response.on('data', function (chunk) {
+ data.push(chunk);
+ dataLen += chunk.length;
+ });
+
+ response.on('end', function(){
+ var buf = new Buffer(dataLen);
+ for (var i=0, len=data.length, pos=0; i<len; i++) {
+ data[i].copy(buf, pos);
+ pos += data[i].length;
+ }
+ //Get thre receiver function for the transport and
+ // call it with the buffer
+ self.transport.receiver(decodeCallback)(buf);
+ });
+ };
+};
+
+HttpConnection.prototype.write = function(data) {
+ var req = http.request(this.nodeOptions, this.responseCallback);
+
+ req.on('error', function(e) {
+ throw new thrift.TApplicationException(thrift.TApplicationExceptionType.UNKNOWN,
+ "Request failed");
+ });
+
+ req.write(data);
+ req.end();
+};
+
+exports.createHttpConnection = function(host, port, options) {
+ return new HttpConnection(host, port, options);
+};
+
+exports.createHttpClient = function(cls, httpConnection) {
+ if (cls.Client) {
+ cls = cls.Client;
+ }
+ return httpConnection.client =
+ new cls(new httpConnection.transport(undefined, function(buf) {httpConnection.write(buf);}),
+ httpConnection.protocol);
+};
+
diff --git a/lib/nodejs/lib/thrift/index.js b/lib/nodejs/lib/thrift/index.js
index dd965d2..3a865f8 100644
--- a/lib/nodejs/lib/thrift/index.js
+++ b/lib/nodejs/lib/thrift/index.js
@@ -26,6 +26,11 @@
exports.createStdIOClient = connection.createStdIOClient;
exports.createStdIOConnection = connection.createStdIOConnection;
+var httpConnection = require('./http_connection');
+exports.HttpConnection = httpConnection.HttpConnection;
+exports.createHttpConnection = httpConnection.createHttpConnection;
+exports.createHttpClient = httpConnection.createHttpClient;
+
var server = require('./server');
exports.createServer = server.createServer;
exports.createMultiplexServer = server.createMultiplexServer;
diff --git a/lib/nodejs/test/http_client.js b/lib/nodejs/test/http_client.js
new file mode 100644
index 0000000..08a7d27
--- /dev/null
+++ b/lib/nodejs/test/http_client.js
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//This is the client side test for the standard Apache Thrift
+//"ThriftTest" suite. This client will test any protocol/transport
+//combination specified on the command line.
+
+var fs = require('fs');
+var assert = require('assert');
+var thrift = require('thrift');
+var ThriftTest = require('./gen-nodejs/ThriftTest');
+var ThriftTestDriver = require('./thrift_test_driver').ThriftTestDriver;
+var ThriftTestDriverPromise = require('./thrift_test_driver_promise').ThriftTestDriver;
+
+var program = require('commander');
+
+program
+ .option('-p, --protocol <protocol>', 'Set thrift protocol (binary|json) [protocol]')
+ .option('-t, --transport <transport>', 'Set thrift transport (buffered|framed) [transport]')
+ .option('--ssl', 'use SSL transport')
+ .option('--promise', 'test with promise style functions')
+ .parse(process.argv);
+
+
+var protocol = thrift.TBinaryProtocol;
+if (program.protocol === "json") {
+ protocol = thrift.TJSONProtocol;
+}
+
+var transport = thrift.TBufferedTransport;
+if (program.transport === "framed") {
+ transport = thrift.TFramedTransport;
+}
+
+var options = {
+ transport: transport,
+ protocol: protocol,
+ headers: {"Connection": "close"},
+ path: "/test"
+};
+
+var connection = undefined;
+
+if (program.ssl) {
+ options.rejectUnauthorized = false;
+ connection = thrift.createHttpConnection("localhost", 9090, options);
+} else {
+ connection = thrift.createHttpConnection("localhost", 9090, options);
+}
+
+var client = thrift.createHttpClient(ThriftTest, connection);
+
+//connection.on('error', function(err) {
+// assert(false, err);
+//});
+
+var testDriver = ThriftTestDriver;
+if (program.promise) {
+ testDriver = ThriftTestDriverPromise;
+}
+testDriver(client, function (status) {
+ console.log(status);
+ process.exit(0);
+});
+
+// to make it also run on expresso
+exports.expressoTest = function() {};
diff --git a/lib/nodejs/test/http_server.js b/lib/nodejs/test/http_server.js
new file mode 100644
index 0000000..d8ef73f
--- /dev/null
+++ b/lib/nodejs/test/http_server.js
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * 'License'); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+var fs = require('fs');
+var path = require('path');
+var thrift = require('thrift');
+var ThriftTest = require('./gen-nodejs/ThriftTest');
+var ThriftTestHandler = require('./test_handler').ThriftTestHandler;
+var ThriftTestHandlerPromise = require('./test_handler_promise').ThriftTestHandler;
+
+var program = require('commander');
+
+program
+ .option('-p, --protocol <protocol>', 'Set thift protocol (binary|json) [protocol]')
+ .option('-t, --transport <transport>', 'Set thift transport (buffered|framed) [transport]')
+ .option('--ssl', 'use ssl transport')
+ .option('--promise', 'test with promise style functions')
+ .parse(process.argv);
+
+var transport = thrift.TBufferedTransport;
+if (program.transport === "framed") {
+ transport = thrift.TFramedTransport;
+}
+
+var protocol = thrift.TBinaryProtocol;
+if (program.protocol === "json") {
+ protocol = thrift.TJSONProtocol;
+}
+
+var handler = ThriftTestHandler;
+if (program.promise) {
+ handler = ThriftTestHandlerPromise;
+}
+
+var SvcOpt = {
+ handler: handler,
+ processor: ThriftTest,
+ protocol: protocol,
+ transport: transport
+};
+var serverOpt = { services: { "/test": SvcOpt } }
+thrift.createWebServer(serverOpt).listen(9090);
+
+
diff --git a/lib/nodejs/test/testAll.sh b/lib/nodejs/test/testAll.sh
index 96f8a2a..87bbb9d 100755
--- a/lib/nodejs/test/testAll.sh
+++ b/lib/nodejs/test/testAll.sh
@@ -47,6 +47,18 @@
return $RET
}
+testHttpClientServer()
+{
+ echo " Testing HTTP Client/Server with protocol $1 and transport $2 $3";
+ RET=0
+ node ${DIR}/http_server.js -p $1 -t $2 $3 &
+ SERVERPID=$!
+ sleep 1
+ node ${DIR}/http_client.js -p $1 -t $2 $3 || RET=1
+ kill -9 $SERVERPID || RET=1
+ return $RET
+}
+
TESTOK=0
@@ -60,6 +72,7 @@
#integration tests
+#TCP connection tests
testClientServer binary buffered || TESTOK=1
testClientServer json buffered || TESTOK=1
testClientServer binary framed || TESTOK=1
@@ -78,4 +91,10 @@
#test promise style
testClientServer binary framed --promise || TESTOK=1
+#HTTP tests
+testHttpClientServer json buffered || TESTOK=1
+testHttpClientServer json framed || TESTOK=1
+testHttpClientServer binary buffered || TESTOK=1
+testHttpClientServer binary framed || TESTOK=1
+
exit $TESTOK
diff --git a/lib/nodejs/test/thrift_test_driver.js b/lib/nodejs/test/thrift_test_driver.js
index ccc105b..a21c9c5 100644
--- a/lib/nodejs/test/thrift_test_driver.js
+++ b/lib/nodejs/test/thrift_test_driver.js
@@ -288,7 +288,7 @@
test_complete = true;
});
-//We wait up to retry_limit * retry_interval for the test suite to complete
+ //We wait up to retry_limit * retry_interval for the test suite to complete
function TestForCompletion() {
if(test_complete) {
if (callback) {
@@ -298,14 +298,14 @@
if (++retrys < retry_limit) {
setTimeout(TestForCompletion, retry_interval);
} else {
- if (callback) {
- callback("Server test failed to complete after " +
- (retry_limit*retry_interval/1000) + " seconds");
- }
+ if (callback) {
+ callback("Server test failed to complete after " +
+ (retry_limit*retry_interval/1000) + " seconds");
+ }
}
}
}
setTimeout(TestForCompletion, retry_interval);
})();
-}
+};