THRIFT-1129 Add BufferedTransport (non-framed) to Node.js library
Patch: Wade Simmons


git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1090565 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/nodejs/examples/client_multitransport.js b/lib/nodejs/examples/client_multitransport.js
new file mode 100644
index 0000000..5c9e109
--- /dev/null
+++ b/lib/nodejs/examples/client_multitransport.js
@@ -0,0 +1,40 @@
+var thrift = require('thrift'),
+    ttransport = require('thrift/transport');
+
+var UserStorage = require('./gen-nodejs/UserStorage'),
+    ttypes = require('./gen-nodejs/user_types');
+
+var f_conn = thrift.createConnection('localhost', 9090), // default: framed
+    f_client = thrift.createClient(UserStorage, f_conn);
+var b_conn = thrift.createConnection('localhost', 9091, {transport: ttransport.TBufferedTransport}),
+    b_client = thrift.createClient(UserStorage, b_conn);
+var user1 = new ttypes.UserProfile({uid: 1,
+                                    name: "Mark Slee",
+                                    blurb: "I'll find something to put here."});
+var user2 = new ttypes.UserProfile({uid: 2,
+                                    name: "Satoshi Tagomori",
+                                    blurb: "ok, let's test with buffered transport."});
+
+f_conn.on('error', function(err) {
+  console.error("framed:", err);
+});
+
+f_client.store(user1, function(err, response) {
+  if (err) { console.error(err); return; }
+
+  console.log("stored:", user1.uid, " as ", user1.name);
+  b_client.retrieve(user1.uid, function(err, responseUser) {
+    if (err) { console.error(err); return; }
+    console.log("retrieved:", responseUser.uid, " as ", responseUser.name);
+  });
+});
+
+b_client.store(user2, function(err, response) {
+  if (err) { console.error(err); return; }
+
+  console.log("stored:", user2.uid, " as ", user2.name);
+  f_client.retrieve(user2.uid, function(err, responseUser) {
+    if (err) { console.error(err); return; }
+    console.log("retrieved:", responseUser.uid, " as ", responseUser.name);
+  });
+});
diff --git a/lib/nodejs/examples/server_multitransport.js b/lib/nodejs/examples/server_multitransport.js
new file mode 100644
index 0000000..e5d6d76
--- /dev/null
+++ b/lib/nodejs/examples/server_multitransport.js
@@ -0,0 +1,28 @@
+var thrift = require('thrift'),
+    ttransport = require('thrift/transport');
+
+var UserStorage = require('./gen-nodejs/UserStorage'),
+    ttypes = require('./gen-nodejs/user_types');
+
+var users = {};
+
+var store = function(user, success) {
+  console.log("stored:", user.uid);
+  users[user.uid] = user;
+  success();
+};
+var retrieve = function(uid, success) {
+  console.log("retrieved:", uid);
+  success(users[uid]);
+};
+
+var server_framed = thrift.createServer(UserStorage, {
+  store: store,
+  retrieve: retrieve
+});
+server_framed.listen(9090);
+var server_buffered = thrift.createServer(UserStorage, {
+ store: store,
+ retrieve: retrieve
+}, {transport: ttransport.TBufferedTransport});
+server_buffered.listen(9091);
diff --git a/lib/nodejs/lib/thrift/connection.js b/lib/nodejs/lib/thrift/connection.js
index e469f49..3530227 100644
--- a/lib/nodejs/lib/thrift/connection.js
+++ b/lib/nodejs/lib/thrift/connection.js
@@ -19,60 +19,26 @@
 var sys = require('sys'),
     EventEmitter = require("events").EventEmitter,
     net = require('net'),
-    TMemoryBuffer = require('./transport').TMemoryBuffer,
-    TBinaryProtocol = require('./protocol').TBinaryProtocol;
+    ttransport = require('./transport'),
+    tprotocol = require('./protocol');
 
 var BinaryParser = require('./binary_parser').BinaryParser;
 BinaryParser.bigEndian = true;
 
-var int32FramedReceiver = exports.int32FramedReceiver = function (callback) {
-  var frameLeft = 0,
-      framePos = 0,
-      frame = null;
-
-  return function(data) {
-    // var buf = new Buffer(data, 'binary');
-    // console.log(buf);
-    // framed transport
-    while (data.length) {
-      if (frameLeft === 0) {
-        // TODO assumes we have all 4 bytes
-        if (data.length < 4) {
-          throw Error("Not enough bytes");
-        }
-        frameLeft = BinaryParser.toInt(data.slice(0,4));
-        frame = new Buffer(frameLeft);
-        framePos = 0;
-        data = data.slice(4, data.length);
-      }
-
-      if (data.length >= frameLeft) {
-        data.copy(frame, framePos, 0, frameLeft);
-        data = data.slice(frameLeft, data.length);
-
-        frameLeft = 0;
-        callback(frame);
-      } else if (data.length) {
-        data.copy(frame, framePos, 0, data.length);
-        frameLeft -= data.length;
-        framePos += data.length;
-        data = data.slice(data.length, data.length);
-      }
-    }
-  };
-}
-
 var Connection = exports.Connection = function(stream, options) {
   var self = this;
   EventEmitter.call(this);
 
   this.connection = stream;
-  this.offline_queue = [];
   this.options = options || {};
+  this.transport = this.options.transport || ttransport.TFramedTransport;
+  this.protocol = this.options.protocol || tprotocol.TBinaryProtocol;
+  this.offline_queue = [];
   this.connected = false;
 
   this.connection.addListener("connect", function() {
     self.connected = true;
+
     this.setTimeout(self.options.timeout || 0);
     this.setNoDelay();
     this.frameLeft = 0;
@@ -99,17 +65,47 @@
     self.emit("timeout");
   });
 
-  this.connection.addListener("data", int32FramedReceiver(function(data) {
-    // console.log(typeof(data));
-    var input = new TBinaryProtocol(new TMemoryBuffer(data));
-    var r = input.readMessageBegin();
-    // console.log(r);
-    self.client['recv_' + r.fname](input, r.mtype, r.rseqid);
-    // self.emit("data", data);
+  this.connection.addListener("data", self.transport.receiver(function(transport_with_data) {
+    var message = new self.protocol(transport_with_data);
+    try {
+      var header = message.readMessageBegin();
+      var dummy_seqid = header.rseqid * -1;
+      var client = self.client;
+      client._reqs[dummy_seqid] = function(err, success){
+        transport_with_data.commitPosition();
+
+        var callback = client._reqs[header.rseqid];
+        delete client._reqs[header.rseqid];
+        if (callback) {
+          callback(err, success);
+        }
+      };
+      client['recv_' + header.fname](message, header.mtype, dummy_seqid);
+    }
+    catch (e) {
+      if (e instanceof ttransport.InputBufferUnderrunError) {
+        transport_with_data.rollbackPosition();
+      }
+      else {
+        throw e;
+      }
+    }
   }));
-}
+};
 sys.inherits(Connection, EventEmitter);
 
+Connection.prototype.end = function() {
+  this.connection.end();
+}
+
+Connection.prototype.write = function(data) {
+  if (!this.connected) {
+    this.offline_queue.push(data);
+    return;
+  }
+  this.connection.write(data);
+}
+
 exports.createConnection = function(host, port, options) {
   var stream = net.createConnection(port, host);
   var connection = new Connection(stream, options);
@@ -123,28 +119,12 @@
   if (cls.Client) {
     cls = cls.Client;
   }
-  var client = new cls(new TMemoryBuffer(undefined, function(buf) {
+  var client = new cls(new connection.transport(undefined, function(buf) {
     connection.write(buf);
-  }), TBinaryProtocol);
+  }), connection.protocol);
 
   // TODO clean this up
   connection.client = client;
 
   return client;
 }
-
-Connection.prototype.end = function() {
-  this.connection.end();
-}
-
-Connection.prototype.write = function(buf) {
-  // TODO: optimize this better, allocate one buffer instead of both:
-  var msg = new Buffer(buf.length + 4);
-  BinaryParser.fromInt(buf.length).copy(msg, 0, 0, 4);
-  buf.copy(msg, 4, 0, buf.length);
-  if (!this.connected) {
-    this.offline_queue.push(msg);
-  } else {
-    this.connection.write(msg);
-  }
-}
diff --git a/lib/nodejs/lib/thrift/server.js b/lib/nodejs/lib/thrift/server.js
index 43a7967..a73f23e 100644
--- a/lib/nodejs/lib/thrift/server.js
+++ b/lib/nodejs/lib/thrift/server.js
@@ -19,33 +19,41 @@
 var sys = require('sys'),
     net = require('net');
 
+var ttransport = require('./transport');
 var BinaryParser = require('./binary_parser').BinaryParser,
-    TMemoryBuffer = require('./transport').TMemoryBuffer,
-    TBinaryProtocol = require('./protocol').TBinaryProtocol,
-    int32FramedReceiver = require('./connection').int32FramedReceiver;
+    TBinaryProtocol = require('./protocol').TBinaryProtocol;
 
-exports.createServer = function(cls, handler) {
+exports.createServer = function(cls, handler, options) {
   if (cls.Processor) {
     cls = cls.Processor;
   }
   var processor = new cls(handler);
+  var transport = (options && options.transport) ? options.transport : ttransport.TFramedTransport;
+  var protocol = (options && options.protocol) ? options.protocol : TBinaryProtocol;
 
   return net.createServer(function(stream) {
-    stream.on('data', int32FramedReceiver(function(data) {
-      var input = new TBinaryProtocol(new TMemoryBuffer(data));
-      var output = new TBinaryProtocol(new TMemoryBuffer(undefined, function(buf) {
-        // TODO: optimize this better, allocate one buffer instead of both:
-        var msg = new Buffer(buf.length + 4);
-        BinaryParser.fromInt(buf.length).copy(msg, 0, 0, 4);
-        buf.copy(msg, 4, 0, buf.length);
-        stream.write(msg);
+    stream.on('data', transport.receiver(function(transport_with_data) {
+      var input = new protocol(transport_with_data);
+      var output = new protocol(new transport(undefined, function(buf) {
+        stream.write(buf);
       }));
 
-      processor.process(input, output);
+      try {
+        processor.process(input, output);
+        transport_with_data.commitPosition();
+      }
+      catch (e) {
+        if (e instanceof ttransport.InputBufferUnderrunError) {
+          transport_with_data.rollbackPosition();
+        }
+        else {
+          throw e;
+        }
+      }
     }));
 
     stream.on('end', function() {
       stream.end();
     });
   });
-}
+};
diff --git a/lib/nodejs/lib/thrift/transport.js b/lib/nodejs/lib/thrift/transport.js
index 0c10ef0..a926a6d 100644
--- a/lib/nodejs/lib/thrift/transport.js
+++ b/lib/nodejs/lib/thrift/transport.js
@@ -16,72 +16,225 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-var TMemoryBuffer = exports.TMemoryBuffer = function(buffer, flushCallback) {
-  if (buffer !== undefined) {
-    this.recv_buf = buffer;
-  } else {
-    this.recv_buf = new Buffer(0);
-  }
-  this.recv_buf_sz = this.recv_buf.length;
-  this.send_buf = [];
-  this.rpos = 0;
-  this.flushCallback = flushCallback;
-}
+var BinaryParser = require('./binary_parser').BinaryParser;
 
-TMemoryBuffer.prototype.isOpen = function() {
-  // TODO
-  return true;
-}
+var emptyBuf = new Buffer(0);
 
-TMemoryBuffer.prototype.open = function() {
-}
+var InputBufferUnderrunError = exports.InputBufferUnderrunError = function() {
+};
 
-TMemoryBuffer.prototype.close = function() {
-}
+var TFramedTransport = exports.TFramedTransport = function(buffer, callback) {
+  this.inBuf = buffer || emptyBuf;
+  this.outBuffers = [];
+  this.outCount = 0;
+  this.readPos = 0;
+  this.onFlush = callback;
+};
+TFramedTransport.receiver = function(callback) {
+  var frameLeft = 0,
+      framePos = 0,
+      frame = null;
+  var residual = null;
 
-TMemoryBuffer.prototype.read = function(len) {
-  var avail = this.recv_buf_sz - this.rpos;
-  // console.log("avail: " + avail);
-
-  if(avail == 0)
-      return new Buffer(0);
-
-  var give = len
-
-  if(avail < len) {
-      console.log("asked for: " + len);
-      throw new Error("asked for too much");
-      give = avail
-  }
-
-  // console.log(this.rpos + "," + give);
-  var ret = this.recv_buf.slice(this.rpos,this.rpos + give)
-  this.rpos += give
-  // console.log(ret);
-
-  //clear buf when complete?
-  return ret
-
-}
-
-TMemoryBuffer.prototype.readAll = function() {
-  return this.recv_buf;
-}
-
-TMemoryBuffer.prototype.write = function(buf) {
-  // TODO
-  if (typeof(buf) === "string") {
-    for (var i = 0; i < buf.length; ++i) {
-      this.send_buf.push(buf.charCodeAt(i));
+  return function(data) {
+    // Prepend any residual data from our previous read
+    if (residual) {
+      var dat = new Buffer(data.length + residual.length);
+      residual.copy(dat, 0, 0);
+      data.copy(dat, residual.length, 0);
+      residual = null;
     }
-  } else {
-    for (var i = 0; i < buf.length; ++i) {
-      this.send_buf.push(buf[i]);
-    }
-  }
-}
 
-TMemoryBuffer.prototype.flush = function() {
-  this.flushCallback(new Buffer(this.send_buf));
-  this.send_buf = [];
-}
+    // framed transport
+    while (data.length) {
+      if (frameLeft === 0) {
+        // TODO assumes we have all 4 bytes
+        if (data.length < 4) {
+          console.log("Expecting > 4 bytes, found only " + data.length);
+          residual = data;
+          break;
+          //throw Error("Expecting > 4 bytes, found only " + data.length);
+        }
+        frameLeft = BinaryParser.toInt(data.slice(0,4));
+        frame = new Buffer(frameLeft);
+        framePos = 0;
+        data = data.slice(4, data.length);
+      }
+      
+      if (data.length >= frameLeft) {
+        data.copy(frame, framePos, 0, frameLeft);
+        data = data.slice(frameLeft, data.length);
+        
+        frameLeft = 0;
+        callback(new TFramedTransport(frame));
+      } else if (data.length) {
+        data.copy(frame, framePos, 0, data.length);
+        frameLeft -= data.length;
+        framePos += data.length;
+        data = data.slice(data.length, data.length);
+      }
+    }
+  };
+};
+
+TFramedTransport.prototype = {
+  commitPosition: function(){},
+  rollbackPosition: function(){},
+
+  // TODO: Implement open/close support
+  isOpen: function() {return true;},
+  open: function() {},
+  close: function() {},
+
+  read: function(len) { // this function will be used for each frames.
+    var end = this.readPos + len;
+
+    if (this.inBuf.length < end) {
+      throw new Error('read(' + len + ') failed - not enough data');
+    }
+
+    var buf = this.inBuf.slice(this.readPos, end);
+    this.readPos = end;
+    return buf;
+  },
+
+  readAll: function() {
+    return this.inBuf;
+  },
+
+  write: function(buf, encoding) {
+    if (typeof(buf) === "string") {
+      // Defaulting to ascii encoding here since that's more like the original
+      // code, but I feel like 'utf8' would be a better choice.
+      buf = new Buffer(buf, encoding || 'ascii');
+    }
+    this.outBuffers.push(buf);
+    this.outCount += buf.length;
+  },
+
+  flush: function() {
+    var out = new Buffer(this.outCount),
+        pos = 0;
+    this.outBuffers.forEach(function(buf) {
+      buf.copy(out, pos, 0);
+      pos += buf.length;
+    });
+    
+    if (this.onFlush) {
+      // TODO: optimize this better, allocate one buffer instead of both:
+      var msg = new Buffer(out.length + 4);
+      BinaryParser.fromInt(out.length).copy(msg, 0, 0, 4);
+      out.copy(msg, 4, 0, out.length);
+      this.onFlush(msg);
+    }
+
+    this.outBuffers = [];
+    this.outCount = 0;
+  }
+};
+
+var TBufferedTransport = exports.TBufferedTransport = function(buffer, callback) {
+  this.defaultReadBufferSize = 1024;
+  this.writeBufferSize = 512; // Soft Limit
+  this.inBuf = new Buffer(this.defaultReadBufferSize);
+  this.readCursor = 0;
+  this.writeCursor = 0; // for input buffer
+  this.outBuffers = [];
+  this.outCount = 0;
+  this.onFlush = callback;
+};
+TBufferedTransport.receiver = function(callback) {
+  var reader = new TBufferedTransport();
+
+  return function(data) {
+    if (reader.writeCursor + data.length > reader.inBuf.length) {
+      var buf = new Buffer(reader.writeCursor + data.length);
+      reader.inBuf.copy(buf, 0, 0, reader.writeCursor);
+      reader.inBuf = buf;
+    }
+    data.copy(reader.inBuf, reader.writeCursor, 0);
+    reader.writeCursor += data.length;
+
+    callback(reader);
+  };
+};
+
+TBufferedTransport.prototype = {
+  commitPosition: function(){
+    var unreadedSize = this.writeCursor - this.readCursor;
+    var bufSize = (unreadedSize * 2 > this.defaultReadBufferSize) ? unreadedSize * 2 : this.defaultReadBufferSize;
+    var buf = new Buffer(bufSize);
+    if (unreadedSize > 0) {
+      this.inBuf.copy(buf, 0, this.readCursor, unreadedSize);
+    }
+    this.readCursor = 0;
+    this.writeCursor = unreadedSize;
+    this.inBuf = buf;
+  },
+  rollbackPosition: function(){
+    this.readCursor = 0;
+  },
+
+  // TODO: Implement open/close support
+  isOpen: function() {return true;},
+  open: function() {},
+  close: function() {},
+
+  read: function(len) {
+    if (this.readCursor + len > this.writeCursor) {
+      throw new InputBufferUnderrunError();
+    }
+    var buf = new Buffer(len);
+    this.inBuf.copy(buf, 0, this.readCursor, this.readCursor + len);
+    this.readCursor += len;
+    return buf;
+  },
+
+  readAll: function() {
+    if (this.readCursor >= this.writeCursor) {
+      throw new InputBufferUnderrunError();
+    }
+    var buf = new Buffer(this.writeCursor - this.readCursor);
+    this.inBuf.copy(buf, 0, this.readCursor, this.writeCursor - this.readCursor);
+    this.readCursor = this.writeCursor;
+    return buf;
+  },
+
+  write: function(buf, encoding) {
+    if (typeof(buf) === "string") {
+      // Defaulting to ascii encoding here since that's more like the original
+      // code, but I feel like 'utf8' would be a better choice.
+      buf = new Buffer(buf, encoding || 'ascii');
+    }
+    if (this.outCount + buf.length > this.writeBufferSize) {
+      this.flush();
+    }
+
+    this.outBuffers.push(buf);
+    this.outCount += buf.length;
+
+    if (this.outCount >= this.writeBufferSize) {
+      this.flush();
+    }
+  },
+
+  flush: function() {
+    if (this.outCount < 1) {
+      return;
+    }
+    
+    var msg = new Buffer(this.outCount),
+        pos = 0;
+    this.outBuffers.forEach(function(buf) {
+      buf.copy(msg, pos, 0);
+      pos += buf.length;
+    });
+    
+    if (this.onFlush) {
+      this.onFlush(msg);
+    }
+
+    this.outBuffers = [];
+    this.outCount = 0;
+  }
+};