THRIFT-1129 Add BufferedTransport (non-framed) to Node.js library
Patch: Wade Simmons
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1090565 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/nodejs/examples/client_multitransport.js b/lib/nodejs/examples/client_multitransport.js
new file mode 100644
index 0000000..5c9e109
--- /dev/null
+++ b/lib/nodejs/examples/client_multitransport.js
@@ -0,0 +1,40 @@
+var thrift = require('thrift'),
+ ttransport = require('thrift/transport');
+
+var UserStorage = require('./gen-nodejs/UserStorage'),
+ ttypes = require('./gen-nodejs/user_types');
+
+var f_conn = thrift.createConnection('localhost', 9090), // default: framed
+ f_client = thrift.createClient(UserStorage, f_conn);
+var b_conn = thrift.createConnection('localhost', 9091, {transport: ttransport.TBufferedTransport}),
+ b_client = thrift.createClient(UserStorage, b_conn);
+var user1 = new ttypes.UserProfile({uid: 1,
+ name: "Mark Slee",
+ blurb: "I'll find something to put here."});
+var user2 = new ttypes.UserProfile({uid: 2,
+ name: "Satoshi Tagomori",
+ blurb: "ok, let's test with buffered transport."});
+
+f_conn.on('error', function(err) {
+ console.error("framed:", err);
+});
+
+f_client.store(user1, function(err, response) {
+ if (err) { console.error(err); return; }
+
+ console.log("stored:", user1.uid, " as ", user1.name);
+ b_client.retrieve(user1.uid, function(err, responseUser) {
+ if (err) { console.error(err); return; }
+ console.log("retrieved:", responseUser.uid, " as ", responseUser.name);
+ });
+});
+
+b_client.store(user2, function(err, response) {
+ if (err) { console.error(err); return; }
+
+ console.log("stored:", user2.uid, " as ", user2.name);
+ f_client.retrieve(user2.uid, function(err, responseUser) {
+ if (err) { console.error(err); return; }
+ console.log("retrieved:", responseUser.uid, " as ", responseUser.name);
+ });
+});
diff --git a/lib/nodejs/examples/server_multitransport.js b/lib/nodejs/examples/server_multitransport.js
new file mode 100644
index 0000000..e5d6d76
--- /dev/null
+++ b/lib/nodejs/examples/server_multitransport.js
@@ -0,0 +1,28 @@
+var thrift = require('thrift'),
+ ttransport = require('thrift/transport');
+
+var UserStorage = require('./gen-nodejs/UserStorage'),
+ ttypes = require('./gen-nodejs/user_types');
+
+var users = {};
+
+var store = function(user, success) {
+ console.log("stored:", user.uid);
+ users[user.uid] = user;
+ success();
+};
+var retrieve = function(uid, success) {
+ console.log("retrieved:", uid);
+ success(users[uid]);
+};
+
+var server_framed = thrift.createServer(UserStorage, {
+ store: store,
+ retrieve: retrieve
+});
+server_framed.listen(9090);
+var server_buffered = thrift.createServer(UserStorage, {
+ store: store,
+ retrieve: retrieve
+}, {transport: ttransport.TBufferedTransport});
+server_buffered.listen(9091);
diff --git a/lib/nodejs/lib/thrift/connection.js b/lib/nodejs/lib/thrift/connection.js
index e469f49..3530227 100644
--- a/lib/nodejs/lib/thrift/connection.js
+++ b/lib/nodejs/lib/thrift/connection.js
@@ -19,60 +19,26 @@
var sys = require('sys'),
EventEmitter = require("events").EventEmitter,
net = require('net'),
- TMemoryBuffer = require('./transport').TMemoryBuffer,
- TBinaryProtocol = require('./protocol').TBinaryProtocol;
+ ttransport = require('./transport'),
+ tprotocol = require('./protocol');
var BinaryParser = require('./binary_parser').BinaryParser;
BinaryParser.bigEndian = true;
-var int32FramedReceiver = exports.int32FramedReceiver = function (callback) {
- var frameLeft = 0,
- framePos = 0,
- frame = null;
-
- return function(data) {
- // var buf = new Buffer(data, 'binary');
- // console.log(buf);
- // framed transport
- while (data.length) {
- if (frameLeft === 0) {
- // TODO assumes we have all 4 bytes
- if (data.length < 4) {
- throw Error("Not enough bytes");
- }
- frameLeft = BinaryParser.toInt(data.slice(0,4));
- frame = new Buffer(frameLeft);
- framePos = 0;
- data = data.slice(4, data.length);
- }
-
- if (data.length >= frameLeft) {
- data.copy(frame, framePos, 0, frameLeft);
- data = data.slice(frameLeft, data.length);
-
- frameLeft = 0;
- callback(frame);
- } else if (data.length) {
- data.copy(frame, framePos, 0, data.length);
- frameLeft -= data.length;
- framePos += data.length;
- data = data.slice(data.length, data.length);
- }
- }
- };
-}
-
var Connection = exports.Connection = function(stream, options) {
var self = this;
EventEmitter.call(this);
this.connection = stream;
- this.offline_queue = [];
this.options = options || {};
+ this.transport = this.options.transport || ttransport.TFramedTransport;
+ this.protocol = this.options.protocol || tprotocol.TBinaryProtocol;
+ this.offline_queue = [];
this.connected = false;
this.connection.addListener("connect", function() {
self.connected = true;
+
this.setTimeout(self.options.timeout || 0);
this.setNoDelay();
this.frameLeft = 0;
@@ -99,17 +65,47 @@
self.emit("timeout");
});
- this.connection.addListener("data", int32FramedReceiver(function(data) {
- // console.log(typeof(data));
- var input = new TBinaryProtocol(new TMemoryBuffer(data));
- var r = input.readMessageBegin();
- // console.log(r);
- self.client['recv_' + r.fname](input, r.mtype, r.rseqid);
- // self.emit("data", data);
+ this.connection.addListener("data", self.transport.receiver(function(transport_with_data) {
+ var message = new self.protocol(transport_with_data);
+ try {
+ var header = message.readMessageBegin();
+ var dummy_seqid = header.rseqid * -1;
+ var client = self.client;
+ client._reqs[dummy_seqid] = function(err, success){
+ transport_with_data.commitPosition();
+
+ var callback = client._reqs[header.rseqid];
+ delete client._reqs[header.rseqid];
+ if (callback) {
+ callback(err, success);
+ }
+ };
+ client['recv_' + header.fname](message, header.mtype, dummy_seqid);
+ }
+ catch (e) {
+ if (e instanceof ttransport.InputBufferUnderrunError) {
+ transport_with_data.rollbackPosition();
+ }
+ else {
+ throw e;
+ }
+ }
}));
-}
+};
sys.inherits(Connection, EventEmitter);
+Connection.prototype.end = function() {
+ this.connection.end();
+}
+
+Connection.prototype.write = function(data) {
+ if (!this.connected) {
+ this.offline_queue.push(data);
+ return;
+ }
+ this.connection.write(data);
+}
+
exports.createConnection = function(host, port, options) {
var stream = net.createConnection(port, host);
var connection = new Connection(stream, options);
@@ -123,28 +119,12 @@
if (cls.Client) {
cls = cls.Client;
}
- var client = new cls(new TMemoryBuffer(undefined, function(buf) {
+ var client = new cls(new connection.transport(undefined, function(buf) {
connection.write(buf);
- }), TBinaryProtocol);
+ }), connection.protocol);
// TODO clean this up
connection.client = client;
return client;
}
-
-Connection.prototype.end = function() {
- this.connection.end();
-}
-
-Connection.prototype.write = function(buf) {
- // TODO: optimize this better, allocate one buffer instead of both:
- var msg = new Buffer(buf.length + 4);
- BinaryParser.fromInt(buf.length).copy(msg, 0, 0, 4);
- buf.copy(msg, 4, 0, buf.length);
- if (!this.connected) {
- this.offline_queue.push(msg);
- } else {
- this.connection.write(msg);
- }
-}
diff --git a/lib/nodejs/lib/thrift/server.js b/lib/nodejs/lib/thrift/server.js
index 43a7967..a73f23e 100644
--- a/lib/nodejs/lib/thrift/server.js
+++ b/lib/nodejs/lib/thrift/server.js
@@ -19,33 +19,41 @@
var sys = require('sys'),
net = require('net');
+var ttransport = require('./transport');
var BinaryParser = require('./binary_parser').BinaryParser,
- TMemoryBuffer = require('./transport').TMemoryBuffer,
- TBinaryProtocol = require('./protocol').TBinaryProtocol,
- int32FramedReceiver = require('./connection').int32FramedReceiver;
+ TBinaryProtocol = require('./protocol').TBinaryProtocol;
-exports.createServer = function(cls, handler) {
+exports.createServer = function(cls, handler, options) {
if (cls.Processor) {
cls = cls.Processor;
}
var processor = new cls(handler);
+ var transport = (options && options.transport) ? options.transport : ttransport.TFramedTransport;
+ var protocol = (options && options.protocol) ? options.protocol : TBinaryProtocol;
return net.createServer(function(stream) {
- stream.on('data', int32FramedReceiver(function(data) {
- var input = new TBinaryProtocol(new TMemoryBuffer(data));
- var output = new TBinaryProtocol(new TMemoryBuffer(undefined, function(buf) {
- // TODO: optimize this better, allocate one buffer instead of both:
- var msg = new Buffer(buf.length + 4);
- BinaryParser.fromInt(buf.length).copy(msg, 0, 0, 4);
- buf.copy(msg, 4, 0, buf.length);
- stream.write(msg);
+ stream.on('data', transport.receiver(function(transport_with_data) {
+ var input = new protocol(transport_with_data);
+ var output = new protocol(new transport(undefined, function(buf) {
+ stream.write(buf);
}));
- processor.process(input, output);
+ try {
+ processor.process(input, output);
+ transport_with_data.commitPosition();
+ }
+ catch (e) {
+ if (e instanceof ttransport.InputBufferUnderrunError) {
+ transport_with_data.rollbackPosition();
+ }
+ else {
+ throw e;
+ }
+ }
}));
stream.on('end', function() {
stream.end();
});
});
-}
+};
diff --git a/lib/nodejs/lib/thrift/transport.js b/lib/nodejs/lib/thrift/transport.js
index 0c10ef0..a926a6d 100644
--- a/lib/nodejs/lib/thrift/transport.js
+++ b/lib/nodejs/lib/thrift/transport.js
@@ -16,72 +16,225 @@
* specific language governing permissions and limitations
* under the License.
*/
-var TMemoryBuffer = exports.TMemoryBuffer = function(buffer, flushCallback) {
- if (buffer !== undefined) {
- this.recv_buf = buffer;
- } else {
- this.recv_buf = new Buffer(0);
- }
- this.recv_buf_sz = this.recv_buf.length;
- this.send_buf = [];
- this.rpos = 0;
- this.flushCallback = flushCallback;
-}
+var BinaryParser = require('./binary_parser').BinaryParser;
-TMemoryBuffer.prototype.isOpen = function() {
- // TODO
- return true;
-}
+var emptyBuf = new Buffer(0);
-TMemoryBuffer.prototype.open = function() {
-}
+var InputBufferUnderrunError = exports.InputBufferUnderrunError = function() {
+};
-TMemoryBuffer.prototype.close = function() {
-}
+var TFramedTransport = exports.TFramedTransport = function(buffer, callback) {
+ this.inBuf = buffer || emptyBuf;
+ this.outBuffers = [];
+ this.outCount = 0;
+ this.readPos = 0;
+ this.onFlush = callback;
+};
+TFramedTransport.receiver = function(callback) {
+ var frameLeft = 0,
+ framePos = 0,
+ frame = null;
+ var residual = null;
-TMemoryBuffer.prototype.read = function(len) {
- var avail = this.recv_buf_sz - this.rpos;
- // console.log("avail: " + avail);
-
- if(avail == 0)
- return new Buffer(0);
-
- var give = len
-
- if(avail < len) {
- console.log("asked for: " + len);
- throw new Error("asked for too much");
- give = avail
- }
-
- // console.log(this.rpos + "," + give);
- var ret = this.recv_buf.slice(this.rpos,this.rpos + give)
- this.rpos += give
- // console.log(ret);
-
- //clear buf when complete?
- return ret
-
-}
-
-TMemoryBuffer.prototype.readAll = function() {
- return this.recv_buf;
-}
-
-TMemoryBuffer.prototype.write = function(buf) {
- // TODO
- if (typeof(buf) === "string") {
- for (var i = 0; i < buf.length; ++i) {
- this.send_buf.push(buf.charCodeAt(i));
+ return function(data) {
+ // Prepend any residual data from our previous read
+ if (residual) {
+ var dat = new Buffer(data.length + residual.length);
+ residual.copy(dat, 0, 0);
+ data.copy(dat, residual.length, 0);
+ residual = null;
}
- } else {
- for (var i = 0; i < buf.length; ++i) {
- this.send_buf.push(buf[i]);
- }
- }
-}
-TMemoryBuffer.prototype.flush = function() {
- this.flushCallback(new Buffer(this.send_buf));
- this.send_buf = [];
-}
+ // framed transport
+ while (data.length) {
+ if (frameLeft === 0) {
+ // TODO assumes we have all 4 bytes
+ if (data.length < 4) {
+ console.log("Expecting > 4 bytes, found only " + data.length);
+ residual = data;
+ break;
+ //throw Error("Expecting > 4 bytes, found only " + data.length);
+ }
+ frameLeft = BinaryParser.toInt(data.slice(0,4));
+ frame = new Buffer(frameLeft);
+ framePos = 0;
+ data = data.slice(4, data.length);
+ }
+
+ if (data.length >= frameLeft) {
+ data.copy(frame, framePos, 0, frameLeft);
+ data = data.slice(frameLeft, data.length);
+
+ frameLeft = 0;
+ callback(new TFramedTransport(frame));
+ } else if (data.length) {
+ data.copy(frame, framePos, 0, data.length);
+ frameLeft -= data.length;
+ framePos += data.length;
+ data = data.slice(data.length, data.length);
+ }
+ }
+ };
+};
+
+TFramedTransport.prototype = {
+ commitPosition: function(){},
+ rollbackPosition: function(){},
+
+ // TODO: Implement open/close support
+ isOpen: function() {return true;},
+ open: function() {},
+ close: function() {},
+
+ read: function(len) { // this function will be used for each frames.
+ var end = this.readPos + len;
+
+ if (this.inBuf.length < end) {
+ throw new Error('read(' + len + ') failed - not enough data');
+ }
+
+ var buf = this.inBuf.slice(this.readPos, end);
+ this.readPos = end;
+ return buf;
+ },
+
+ readAll: function() {
+ return this.inBuf;
+ },
+
+ write: function(buf, encoding) {
+ if (typeof(buf) === "string") {
+ // Defaulting to ascii encoding here since that's more like the original
+ // code, but I feel like 'utf8' would be a better choice.
+ buf = new Buffer(buf, encoding || 'ascii');
+ }
+ this.outBuffers.push(buf);
+ this.outCount += buf.length;
+ },
+
+ flush: function() {
+ var out = new Buffer(this.outCount),
+ pos = 0;
+ this.outBuffers.forEach(function(buf) {
+ buf.copy(out, pos, 0);
+ pos += buf.length;
+ });
+
+ if (this.onFlush) {
+ // TODO: optimize this better, allocate one buffer instead of both:
+ var msg = new Buffer(out.length + 4);
+ BinaryParser.fromInt(out.length).copy(msg, 0, 0, 4);
+ out.copy(msg, 4, 0, out.length);
+ this.onFlush(msg);
+ }
+
+ this.outBuffers = [];
+ this.outCount = 0;
+ }
+};
+
+var TBufferedTransport = exports.TBufferedTransport = function(buffer, callback) {
+ this.defaultReadBufferSize = 1024;
+ this.writeBufferSize = 512; // Soft Limit
+ this.inBuf = new Buffer(this.defaultReadBufferSize);
+ this.readCursor = 0;
+ this.writeCursor = 0; // for input buffer
+ this.outBuffers = [];
+ this.outCount = 0;
+ this.onFlush = callback;
+};
+TBufferedTransport.receiver = function(callback) {
+ var reader = new TBufferedTransport();
+
+ return function(data) {
+ if (reader.writeCursor + data.length > reader.inBuf.length) {
+ var buf = new Buffer(reader.writeCursor + data.length);
+ reader.inBuf.copy(buf, 0, 0, reader.writeCursor);
+ reader.inBuf = buf;
+ }
+ data.copy(reader.inBuf, reader.writeCursor, 0);
+ reader.writeCursor += data.length;
+
+ callback(reader);
+ };
+};
+
+TBufferedTransport.prototype = {
+ commitPosition: function(){
+ var unreadedSize = this.writeCursor - this.readCursor;
+ var bufSize = (unreadedSize * 2 > this.defaultReadBufferSize) ? unreadedSize * 2 : this.defaultReadBufferSize;
+ var buf = new Buffer(bufSize);
+ if (unreadedSize > 0) {
+ this.inBuf.copy(buf, 0, this.readCursor, unreadedSize);
+ }
+ this.readCursor = 0;
+ this.writeCursor = unreadedSize;
+ this.inBuf = buf;
+ },
+ rollbackPosition: function(){
+ this.readCursor = 0;
+ },
+
+ // TODO: Implement open/close support
+ isOpen: function() {return true;},
+ open: function() {},
+ close: function() {},
+
+ read: function(len) {
+ if (this.readCursor + len > this.writeCursor) {
+ throw new InputBufferUnderrunError();
+ }
+ var buf = new Buffer(len);
+ this.inBuf.copy(buf, 0, this.readCursor, this.readCursor + len);
+ this.readCursor += len;
+ return buf;
+ },
+
+ readAll: function() {
+ if (this.readCursor >= this.writeCursor) {
+ throw new InputBufferUnderrunError();
+ }
+ var buf = new Buffer(this.writeCursor - this.readCursor);
+ this.inBuf.copy(buf, 0, this.readCursor, this.writeCursor - this.readCursor);
+ this.readCursor = this.writeCursor;
+ return buf;
+ },
+
+ write: function(buf, encoding) {
+ if (typeof(buf) === "string") {
+ // Defaulting to ascii encoding here since that's more like the original
+ // code, but I feel like 'utf8' would be a better choice.
+ buf = new Buffer(buf, encoding || 'ascii');
+ }
+ if (this.outCount + buf.length > this.writeBufferSize) {
+ this.flush();
+ }
+
+ this.outBuffers.push(buf);
+ this.outCount += buf.length;
+
+ if (this.outCount >= this.writeBufferSize) {
+ this.flush();
+ }
+ },
+
+ flush: function() {
+ if (this.outCount < 1) {
+ return;
+ }
+
+ var msg = new Buffer(this.outCount),
+ pos = 0;
+ this.outBuffers.forEach(function(buf) {
+ buf.copy(msg, pos, 0);
+ pos += buf.length;
+ });
+
+ if (this.onFlush) {
+ this.onFlush(msg);
+ }
+
+ this.outBuffers = [];
+ this.outCount = 0;
+ }
+};