THRIFT-2405:Node.js Multiplexer tests fail (silently)
Client: node
Patch: Randy Abernethy
Repairs client side multiplex protocol.
diff --git a/lib/nodejs/lib/thrift/connection.js b/lib/nodejs/lib/thrift/connection.js
index 36451d5..a3c2d79 100644
--- a/lib/nodejs/lib/thrift/connection.js
+++ b/lib/nodejs/lib/thrift/connection.js
@@ -21,14 +21,16 @@
net = require('net'),
tls = require('tls'),
ttransport = require('./transport'),
- tprotocol = require('./protocol');
+ tprotocol = require('./protocol'),
+ thrift = require('./thrift');
var binary = require('./binary');
var Connection = exports.Connection = function(stream, options) {
var self = this;
EventEmitter.call(this);
-
+
+ this.seqId2Service = {};
this.connection = stream;
this.options = options || {};
this.transport = this.options.transport || ttransport.TBufferedTransport;
@@ -37,18 +39,21 @@
this.connected = false;
this._debug = this.options.debug || false;
- if (this.options.max_attempts
- && !isNaN(this.options.max_attempts) && this.options.max_attempts > 0) {
+ if (this.options.max_attempts &&
+ !isNaN(this.options.max_attempts) &&
+ this.options.max_attempts > 0) {
this.max_attempts = +this.options.max_attempts;
}
this.retry_max_delay = null;
- if (this.options.retry_max_delay !== undefined
- && !isNaN(this.options.retry_max_delay) && this.options.retry_max_delay > 0) {
+ if (this.options.retry_max_delay !== undefined &&
+ !isNaN(this.options.retry_max_delay) &&
+ this.options.retry_max_delay > 0) {
this.retry_max_delay = this.options.retry_max_delay;
}
this.connect_timeout = false;
- if (this.options.connect_timeout
- && !isNaN(this.options.connect_timeout) && this.options.connect_timeout > 0) {
+ if (this.options.connect_timeout &&
+ !isNaN(this.options.connect_timeout) &&
+ this.options.connect_timeout > 0) {
this.connect_timeout = +this.options.connect_timeout;
}
this.connection.addListener("connect", function() {
@@ -89,9 +94,9 @@
this.connection.addListener("error", function(err) {
// Only emit the error if no-one else is listening on the connection
// or if someone is listening on us
- if (self.connection.listeners('error').length === 1
- || self.listeners('error').length > 0) {
- self.emit("error", err)
+ if (self.connection.listeners('error').length === 1 ||
+ self.listeners('error').length > 0) {
+ self.emit("error", err);
}
// "error" events get turned into exceptions if they aren't listened for. If the user handled this error
// then we should try to reconnect.
@@ -114,7 +119,25 @@
var header = message.readMessageBegin();
var dummy_seqid = header.rseqid * -1;
var client = self.client;
- client._reqs[dummy_seqid] = function(err, success){
+ //The Multiplexed Protocol stores a hash of seqid to service names
+ // in seqId2Service. If the SeqId is found in the hash we need to
+ // lookup the appropriate client for this call.
+ // The connection.client object is a single client object when not
+ // multiplexing, when using multiplexing it is a service name keyed
+ // hash of client objects.
+ //NOTE: The 2 way interdependencies between protocols, transports,
+ // connections and clients in the Node.js implementation are irregular
+ // and make the implementation difficult to extend and maintain. We
+ // should bring this stuff inline with typical thrift I/O stack
+ // operation soon.
+ // --ra
+ var service_name = self.seqId2Service[header.rseqid];
+ if (service_name) {
+ client = self.client[service_name];
+ delete self.seqId2Service[header.rseqid];
+ }
+ /*jshint -W083 */
+ client._reqs[dummy_seqid] = function(err, success){
transport_with_data.commitPosition();
var callback = client._reqs[header.rseqid];
@@ -123,13 +146,15 @@
callback(err, success);
}
};
+ /*jshint +W083 */
- if(!client['recv_' + header.fname]) {
- // msg was for another serivce, just drop it
- delete client._reqs[dummy_seqid]
- return
+ if(client['recv_' + header.fname]) {
+ client['recv_' + header.fname](message, header.mtype, dummy_seqid);
+ } else {
+ delete client._reqs[dummy_seqid];
+ throw new thrift.TApplicationException(thrift.TApplicationExceptionType.WRONG_METHOD_NAME,
+ "Received a response to an unknown RPC function");
}
- client['recv_' + header.fname](message, header.mtype, dummy_seqid);
}
}
catch (e) {
@@ -146,7 +171,7 @@
Connection.prototype.end = function() {
this.connection.end();
-}
+};
Connection.prototype.initialize_retry_vars = function () {
this.retry_timer = null;
@@ -162,14 +187,14 @@
return;
}
this.connection.write(data);
-}
+};
Connection.prototype.connection_gone = function () {
var self = this;
// If a retry is already in progress, just let that happen
if (this.retry_timer) {
- return;
+ return;
}
if (!this.max_attempts) {
self.emit("close");
@@ -228,7 +253,7 @@
connection.port = port;
return connection;
-}
+};
exports.createSSLConnection = function(host, port, options) {
var stream = tls.connect(port, host, options);
@@ -237,7 +262,7 @@
connection.port = port;
return connection;
-}
+};
exports.createClient = function(cls, connection) {
@@ -252,7 +277,7 @@
connection.client = client;
return client;
-}
+};
var child_process = require('child_process');
var StdIOConnection = exports.StdIOConnection = function(command, options) {
@@ -326,14 +351,13 @@
}
}
}));
-
};
util.inherits(StdIOConnection, EventEmitter);
StdIOConnection.prototype.end = function() {
this.connection.end();
-}
+};
StdIOConnection.prototype.write = function(data) {
if (!this.connected) {
@@ -341,10 +365,10 @@
return;
}
this.connection.write(data);
-}
+};
+
exports.createStdIOConnection = function(command,options){
return new StdIOConnection(command,options);
-
};
exports.createStdIOClient = function(cls,connection) {
@@ -360,4 +384,4 @@
connection.client = client;
return client;
-}
+};
diff --git a/lib/nodejs/lib/thrift/multiplexed_processor.js b/lib/nodejs/lib/thrift/multiplexed_processor.js
index 2931c4f..1aef4c3 100644
--- a/lib/nodejs/lib/thrift/multiplexed_processor.js
+++ b/lib/nodejs/lib/thrift/multiplexed_processor.js
@@ -38,7 +38,7 @@
var sname = p[0];
var fname = p[1];
- if (! sname in this.services) {
+ if (! (sname in this.services)) {
throw new Thrift.TException("TMultiplexedProcessor: Unknown service: " + sname);
}
diff --git a/lib/nodejs/lib/thrift/multiplexed_protocol.js b/lib/nodejs/lib/thrift/multiplexed_protocol.js
index 9a955ab..68440af 100644
--- a/lib/nodejs/lib/thrift/multiplexed_protocol.js
+++ b/lib/nodejs/lib/thrift/multiplexed_protocol.js
@@ -19,27 +19,31 @@
var util = require('util');
var Thrift = require('./thrift');
-var Wrapper = exports.Wrapper = function(service_name, protocol) {
+var Wrapper = exports.Wrapper = function(service_name, protocol, connection) {
var MultiplexProtocol = function(trans, strictRead, strictWrite) {
protocol.call(this, trans, strictRead, strictWrite);
- }
+ };
util.inherits(MultiplexProtocol, protocol);
MultiplexProtocol.prototype.writeMessageBegin = function(name, type, seqid) {
-
- if (type == Thrift.MessageType.CALL || type == Thrift.MessageType.ONEWAY)
- MultiplexProtocol.super_.prototype.writeMessageBegin.call(this, service_name + ":" + name, type, seqid);
- else
+ if (type == Thrift.MessageType.CALL || type == Thrift.MessageType.ONEWAY) {
+ connection.seqId2Service[seqid] = service_name;
+ MultiplexProtocol.super_.prototype.writeMessageBegin.call(this,
+ service_name + ":" + name,
+ type,
+ seqid);
+ } else {
MultiplexProtocol.super_.prototype.writeMessageBegin.call(this, name, type, seqid);
- }
+ }
+ };
return MultiplexProtocol;
-}
+};
var Multiplexer = exports.Multiplexer = function() {
this.seqid = 0;
-}
+};
Multiplexer.prototype.createClient = function(service_name, cls, connection) {
if (cls.Client) {
@@ -49,15 +53,15 @@
cls.prototype.new_seqid = function() {
self.seqid += 1;
return self.seqid;
- }
-
+ };
var client = new cls(new connection.transport(undefined, function(buf) {
connection.write(buf);
- }), new Wrapper(service_name, connection.protocol));
-
-
- // TODO clean this up
- connection.client = client;
+ }), new Wrapper(service_name, connection.protocol, connection));
+
+ if (typeof connection.client !== 'object') {
+ connection.client = {};
+ }
+ connection.client[service_name] = client;
return client;
-}
+};