Thrift-1353: Switch to performance branch, get rid of BinaryParser
Client: Node.js
Patch: Wade Simmons

"binary" fields will be returned as Buffers instead of Strings
"int64" fields will be returned as http://github.com/broofa/node-int64 objects


git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1401081 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/nodejs/README.md b/lib/nodejs/README.md
index 2832c1b..02dfc74 100644
--- a/lib/nodejs/README.md
+++ b/lib/nodejs/README.md
@@ -55,6 +55,15 @@
       connection.end();
     });
 
+<a name="int64"></a>
+## Int64
+
+Since JavaScript represents all numbers as doubles, int64 values cannot be accurately represented naturally. To solve this, int64 values in responses will be wrapped with Thirft.Int64 objects. The Int64 implementation used is [broofa/node-int64](https://github.com/broofa/node-int64).
+
+## Libraries using node-thrift
+
+* [yukim/node_cassandra](https://github.com/yukim/node_cassandra)
+
 ## Custom client and server example
 
 An example based on the one shown on the Thrift front page is included in the examples/ folder.
diff --git a/lib/nodejs/lib/thrift/connection.js b/lib/nodejs/lib/thrift/connection.js
index cd31a50..05c5a6c 100644
--- a/lib/nodejs/lib/thrift/connection.js
+++ b/lib/nodejs/lib/thrift/connection.js
@@ -22,8 +22,7 @@
     ttransport = require('./transport'),
     tprotocol = require('./protocol');
 
-var BinaryParser = require('./binary_parser').BinaryParser;
-BinaryParser.bigEndian = true;
+var binary = require('./binary');
 
 var Connection = exports.Connection = function(stream, options) {
   var self = this;
@@ -53,7 +52,12 @@
   });
 
   this.connection.addListener("error", function(err) {
-    self.emit("error", err);
+    // Only emit the error if no-one else is listening on the connection
+    // or if someone is listening on us
+    if (self.connection.listeners('error').length === 1
+        || self.listeners('error').length > 0) {
+      self.emit("error", err)
+    }
   });
 
   // Add a close listener
diff --git a/lib/nodejs/lib/thrift/index.js b/lib/nodejs/lib/thrift/index.js
index fb96353..2554a2f 100644
--- a/lib/nodejs/lib/thrift/index.js
+++ b/lib/nodejs/lib/thrift/index.js
@@ -27,6 +27,8 @@
 
 exports.createServer = require('./server').createServer;
 
+exports.Int64 = require('node-int64')
+
 /*
  * Export transport and protocol so they can be used outside of a 
  * cassandra/server context
diff --git a/lib/nodejs/lib/thrift/protocol.js b/lib/nodejs/lib/thrift/protocol.js
index 1b6c345..cbf6c9a 100644
--- a/lib/nodejs/lib/thrift/protocol.js
+++ b/lib/nodejs/lib/thrift/protocol.js
@@ -20,8 +20,8 @@
     Thrift = require('./thrift'),
     Type = Thrift.Type;
 
-var BinaryParser = require('./binary_parser').BinaryParser;
-BinaryParser.bigEndian = true;
+var binary = require('./binary'),
+    Int64 = require('node-int64');
 
 var UNKNOWN = 0,
     INVALID_DATA = 1,
@@ -122,28 +122,51 @@
 }
 
 TBinaryProtocol.prototype.writeByte = function(byte) {
-  this.trans.write(BinaryParser.fromByte(byte));
+  this.trans.write(new Buffer([byte]));
 }
 
 TBinaryProtocol.prototype.writeI16 = function(i16) {
-  this.trans.write(BinaryParser.fromShort(i16));
+  this.trans.write(binary.writeI16(new Buffer(2), i16));
 }
 
 TBinaryProtocol.prototype.writeI32 = function(i32) {
-  this.trans.write(BinaryParser.fromInt(i32));
+  this.trans.write(binary.writeI32(new Buffer(4), i32));
 }
 
 TBinaryProtocol.prototype.writeI64 = function(i64) {
-  this.trans.write(BinaryParser.fromLong(i64));
+  if (i64.buffer) {
+    this.trans.write(i64.buffer);
+  } else {
+    this.trans.write(new Int64(i64).buffer)
+  }
 }
 
 TBinaryProtocol.prototype.writeDouble = function(dub) {
-  this.trans.write(BinaryParser.fromDouble(dub));
+  this.trans.write(binary.writeDouble(new Buffer(8), dub));
 }
 
-TBinaryProtocol.prototype.writeString = function(str) {
-  this.writeI32(str.length);
-  this.trans.write(str);
+TBinaryProtocol.prototype.writeString = function(arg) {
+  if (typeof(arg) === 'string') {
+    this.writeI32(Buffer.byteLength(arg, 'utf8'))
+    this.trans.write(arg, 'utf8');
+  } else if (arg instanceof Buffer) {
+    this.writeI32(arg.length)
+    this.trans.write(arg);
+  } else {
+    throw new Error('writeString called without a string/Buffer argument: ' + arg)
+  }
+}
+
+TBinaryProtocol.prototype.writeBinary = function(arg) {
+  if (typeof(arg) === 'string') {
+    this.writeI32(Buffer.byteLength(arg, 'utf8'))
+    this.trans.write(arg, 'utf8');
+  } else if (arg instanceof Buffer) {
+    this.writeI32(arg.length)
+    this.trans.write(arg);
+  } else {
+    throw new Error('writeBinary called without a string/Buffer argument: ' + arg)
+  }
 }
 
 TBinaryProtocol.prototype.readMessageBegin = function() {
@@ -229,31 +252,24 @@
 }
 
 TBinaryProtocol.prototype.readByte = function() {
-  var buff = this.trans.read(1);
-  return BinaryParser.toByte(buff);
+  return this.trans.readByte();
 }
 
 TBinaryProtocol.prototype.readI16 = function() {
-  var buff = this.trans.read(2);
-  return BinaryParser.toShort(buff);
+  return this.trans.readI16();
 }
 
 TBinaryProtocol.prototype.readI32 = function() {
-  var buff = this.trans.read(4);
-  // console.log("read buf: ");
-  // console.log(buff);
-  // console.log("result: " + BinaryParser.toInt(buff));
-  return BinaryParser.toInt(buff);
+  return this.trans.readI32();
 }
 
 TBinaryProtocol.prototype.readI64 = function() {
   var buff = this.trans.read(8);
-  return BinaryParser.toLong(buff);
+  return new Int64(buff);
 }
 
 TBinaryProtocol.prototype.readDouble = function() {
-  var buff = this.trans.read(8);
-  return BinaryParser.toFloat(buff);
+  return this.trans.readDouble();
 }
 
 TBinaryProtocol.prototype.readBinary = function() {
@@ -262,9 +278,8 @@
 }
 
 TBinaryProtocol.prototype.readString = function() {
-  var r = this.readBinary().toString('binary');
-  // console.log("readString: " + r);
-  return r;
+  var len = this.readI32();
+  return this.trans.readString(len);
 }
 
 TBinaryProtocol.prototype.getTransport = function() {
diff --git a/lib/nodejs/lib/thrift/server.js b/lib/nodejs/lib/thrift/server.js
index abdf579..a17419b 100644
--- a/lib/nodejs/lib/thrift/server.js
+++ b/lib/nodejs/lib/thrift/server.js
@@ -18,9 +18,8 @@
  */
 var net = require('net');
 
-var ttransport = require('./transport');
-var BinaryParser = require('./binary_parser').BinaryParser,
-    TBinaryProtocol = require('./protocol').TBinaryProtocol;
+var ttransport = require('./transport')
+  , TBinaryProtocol = require('./protocol').TBinaryProtocol;
 
 exports.createServer = function(cls, handler, options) {
   if (cls.Processor) {
@@ -31,22 +30,29 @@
   var protocol = (options && options.protocol) ? options.protocol : TBinaryProtocol;
 
   return net.createServer(function(stream) {
-    stream.on('data', transport.receiver(function(transport_with_data) {
-      var input = new protocol(transport_with_data);
+    var self = this;
+    stream.on('data', transport.receiver(function(transportWithData) {
+      var input = new protocol(transportWithData);
       var output = new protocol(new transport(undefined, function(buf) {
-        stream.write(buf);
+        try {
+            stream.write(buf);
+        } catch (err) {
+            self.emit('error', err);
+            stream.end();
+        }
       }));
 
       try {
         processor.process(input, output);
-        transport_with_data.commitPosition();
+        transportWithData.commitPosition();
       }
-      catch (e) {
-        if (e instanceof ttransport.InputBufferUnderrunError) {
-          transport_with_data.rollbackPosition();
+      catch (err) {
+        if (err instanceof ttransport.InputBufferUnderrunError) {
+          transportWithData.rollbackPosition();
         }
         else {
-          throw e;
+          self.emit('error', err);
+          stream.end();
         }
       }
     }));
diff --git a/lib/nodejs/lib/thrift/transport.js b/lib/nodejs/lib/thrift/transport.js
index 06351a4..7d83a99 100644
--- a/lib/nodejs/lib/thrift/transport.js
+++ b/lib/nodejs/lib/thrift/transport.js
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-var BinaryParser = require('./binary_parser').BinaryParser;
-
 var emptyBuf = new Buffer(0);
 
+var binary = require('./binary');
+
 var InputBufferUnderrunError = exports.InputBufferUnderrunError = function() {
 };
 
@@ -55,7 +55,7 @@
           break;
           //throw Error("Expecting > 4 bytes, found only " + data.length);
         }
-        frameLeft = BinaryParser.toInt(data.slice(0,4));
+        frameLeft = binary.readI32(data, 0);
         frame = new Buffer(frameLeft);
         framePos = 0;
         data = data.slice(4, data.length);
@@ -98,15 +98,41 @@
     return buf;
   },
 
+  readByte: function() {
+    return this.inBuf[this.readPos++];
+  },
+
+  readI16: function() {
+    var i16 = binary.readI16(this.inBuf, this.readPos);
+    this.readPos += 2;
+    return i16;
+  },
+
+  readI32: function() {
+    var i32 = binary.readI32(this.inBuf, this.readPos);
+    this.readPos += 4;
+    return i32;
+  },
+
+  readDouble: function() {
+    var d = binary.readDouble(this.inBuf, this.readPos);
+    this.readPos += 8;
+    return d;
+  },
+
+  readString: function(len) {
+    var str = this.inBuf.toString('utf8', this.readPos, this.readPos + len);
+    this.readPos += len;
+    return str;
+  },
+
   readAll: function() {
     return this.inBuf;
   },
 
   write: function(buf, encoding) {
     if (typeof(buf) === "string") {
-      // Defaulting to ascii encoding here since that's more like the original
-      // code, but I feel like 'utf8' would be a better choice.
-      buf = new Buffer(buf, encoding || 'ascii');
+      buf = new Buffer(buf, encoding || 'utf8');
     }
     this.outBuffers.push(buf);
     this.outCount += buf.length;
@@ -123,7 +149,8 @@
     if (this.onFlush) {
       // TODO: optimize this better, allocate one buffer instead of both:
       var msg = new Buffer(out.length + 4);
-      BinaryParser.fromInt(out.length).copy(msg, 0, 0, 4);
+      binary.writeI32(msg, out.length)
+      frameLeft = binary.readI32(this.inBuf, 0);
       out.copy(msg, 4, 0, out.length);
       this.onFlush(msg);
     }
@@ -180,16 +207,54 @@
   open: function() {},
   close: function() {},
 
-  read: function(len) {
+  ensureAvailable: function(len) {
     if (this.readCursor + len > this.writeCursor) {
       throw new InputBufferUnderrunError();
     }
+  },
+
+  read: function(len) {
+    this.ensureAvailable(len)
     var buf = new Buffer(len);
     this.inBuf.copy(buf, 0, this.readCursor, this.readCursor + len);
     this.readCursor += len;
     return buf;
   },
 
+  readByte: function() {
+    this.ensureAvailable(1)
+    return this.inBuf[this.readCursor++];
+  },
+
+  readI16: function() {
+    this.ensureAvailable(2)
+    var i16 = binary.readI16(this.inBuf, this.readCursor);
+    this.readCursor += 2;
+    return i16;
+  },
+
+  readI32: function() {
+    this.ensureAvailable(4)
+    var i32 = binary.readI32(this.inBuf, this.readCursor);
+    this.readCursor += 4;
+    return i32;
+  },
+
+  readDouble: function() {
+    this.ensureAvailable(8)
+    var d = binary.readDouble(this.inBuf, this.readCursor);
+    this.readCursor += 8;
+    return d;
+  },
+
+  readString: function(len) {
+    this.ensureAvailable(len)
+    var str = this.inBuf.toString('utf8', this.readCursor, this.readCursor + len);
+    this.readCursor += len;
+    return str;
+  },
+
+
   readAll: function() {
     if (this.readCursor >= this.writeCursor) {
       throw new InputBufferUnderrunError();
diff --git a/lib/nodejs/package.json b/lib/nodejs/package.json
index c2ce1b5..cb4d222 100644
--- a/lib/nodejs/package.json
+++ b/lib/nodejs/package.json
@@ -23,5 +23,8 @@
     },
   "directories" : { "lib" : "./lib/thrift" },
   "main": "./lib/thrift",
-  "engines": { "node": ">= 0.2.4" }
+  "engines": { "node": ">= 0.2.4" },
+  "dependencies": {
+    "node-int64": "0.3.x"
+  }
 }
diff --git a/test/nodejs/client.js b/test/nodejs/client.js
index 3a82a83..88460b0 100644
--- a/test/nodejs/client.js
+++ b/test/nodejs/client.js
@@ -71,14 +71,11 @@
 	passed();
 });
 
-/*
- * FATAL ERROR: CALL_AND_RETRY_2 Allocation failed - process out of memory
 client.testI64(-5, function(err, response) {
   if (err) { return failed(err); }
   console.log("testI64(-5) = ", response);
 	passed();
 });
- */
 
 client.testI64(-34359738368, function(err, response) {
   if (err) { return failed(err); }
diff --git a/test/test.sh b/test/test.sh
index 30a3709..30ca838 100755
--- a/test/test.sh
+++ b/test/test.sh
@@ -126,6 +126,10 @@
         "make -C nodejs/ client" \
         "make -C nodejs/ server" \
         "1"
+do_test "nodejs-cpp" "binary" "framed-ip" \
+        "make -C nodejs/ client" \
+        "cpp/TestServer --transport=framed" \
+        "1"
 do_test "cpp-nodejs" "binary" "framed-ip" \
         "cpp/TestClient --transport=framed" \
         "make -C nodejs/ server" \