THRIFT-2964: nodejs file breakout
Client: NodeJS
Patch: Andrew de Andrade
Moves protocols and transports (among others) into seperate files.
diff --git a/lib/nodejs/lib/thrift/binary_protocol.js b/lib/nodejs/lib/thrift/binary_protocol.js
new file mode 100644
index 0000000..a230291
--- /dev/null
+++ b/lib/nodejs/lib/thrift/binary_protocol.js
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+var log = require('./log');
+var binary = require('./binary');
+var Int64 = require('node-int64');
+var Thrift = require('./thrift');
+var Type = Thrift.Type;
+
+module.exports = TBinaryProtocol;
+
+// JavaScript supports only numeric doubles, therefore even hex values are always signed.
+// The largest integer value which can be represented in JavaScript is +/-2^53.
+// Bitwise operations convert numbers to 32 bit integers but perform sign extension
+// upon assigning values back to variables.
+var VERSION_MASK = -65536, // 0xffff0000
+ VERSION_1 = -2147418112, // 0x80010000
+ TYPE_MASK = 0x000000ff;
+
+function TBinaryProtocol(trans, strictRead, strictWrite) {
+ this.trans = trans;
+ this.strictRead = (strictRead !== undefined ? strictRead : false);
+ this.strictWrite = (strictWrite !== undefined ? strictWrite : true);
+};
+
+TBinaryProtocol.prototype.flush = function() {
+ return this.trans.flush();
+};
+
+TBinaryProtocol.prototype.writeMessageBegin = function(name, type, seqid) {
+ if (this.strictWrite) {
+ this.writeI32(VERSION_1 | type);
+ this.writeString(name);
+ this.writeI32(seqid);
+ } else {
+ this.writeString(name);
+ this.writeByte(type);
+ this.writeI32(seqid);
+ }
+ // Record client seqid to find callback again
+ if (this._seqid) {
+ // TODO better logging log warning
+ log.warning('SeqId already set', { 'name': name });
+ } else {
+ this._seqid = seqid;
+ this.trans.setCurrSeqId(seqid);
+ }
+};
+
+TBinaryProtocol.prototype.writeMessageEnd = function() {
+ if (this._seqid) {
+ this._seqid = null;
+ } else {
+ log.warning('No seqid to unset');
+ }
+};
+
+TBinaryProtocol.prototype.writeStructBegin = function(name) {
+};
+
+TBinaryProtocol.prototype.writeStructEnd = function() {
+};
+
+TBinaryProtocol.prototype.writeFieldBegin = function(name, type, id) {
+ this.writeByte(type);
+ this.writeI16(id);
+};
+
+TBinaryProtocol.prototype.writeFieldEnd = function() {
+};
+
+TBinaryProtocol.prototype.writeFieldStop = function() {
+ this.writeByte(Type.STOP);
+};
+
+TBinaryProtocol.prototype.writeMapBegin = function(ktype, vtype, size) {
+ this.writeByte(ktype);
+ this.writeByte(vtype);
+ this.writeI32(size);
+};
+
+TBinaryProtocol.prototype.writeMapEnd = function() {
+};
+
+TBinaryProtocol.prototype.writeListBegin = function(etype, size) {
+ this.writeByte(etype);
+ this.writeI32(size);
+};
+
+TBinaryProtocol.prototype.writeListEnd = function() {
+};
+
+TBinaryProtocol.prototype.writeSetBegin = function(etype, size) {
+ this.writeByte(etype);
+ this.writeI32(size);
+};
+
+TBinaryProtocol.prototype.writeSetEnd = function() {
+};
+
+TBinaryProtocol.prototype.writeBool = function(bool) {
+ if (bool) {
+ this.writeByte(1);
+ } else {
+ this.writeByte(0);
+ }
+};
+
+TBinaryProtocol.prototype.writeByte = function(b) {
+ this.trans.write(new Buffer([b]));
+};
+
+TBinaryProtocol.prototype.writeI16 = function(i16) {
+ this.trans.write(binary.writeI16(new Buffer(2), i16));
+};
+
+TBinaryProtocol.prototype.writeI32 = function(i32) {
+ this.trans.write(binary.writeI32(new Buffer(4), i32));
+};
+
+TBinaryProtocol.prototype.writeI64 = function(i64) {
+ if (i64.buffer) {
+ this.trans.write(i64.buffer);
+ } else {
+ this.trans.write(new Int64(i64).buffer);
+ }
+};
+
+TBinaryProtocol.prototype.writeDouble = function(dub) {
+ this.trans.write(binary.writeDouble(new Buffer(8), dub));
+};
+
+TBinaryProtocol.prototype.writeString = function(arg) {
+ if (typeof(arg) === 'string') {
+ this.writeI32(Buffer.byteLength(arg, 'utf8'));
+ this.trans.write(arg, 'utf8');
+ } else if (arg instanceof Buffer) {
+ this.writeI32(arg.length);
+ this.trans.write(arg);
+ } else {
+ throw new Error('writeString called without a string/Buffer argument: ' + arg);
+ }
+};
+
+TBinaryProtocol.prototype.writeBinary = function(arg) {
+ if (typeof(arg) === 'string') {
+ this.writeI32(Buffer.byteLength(arg, 'utf8'));
+ this.trans.write(arg, 'utf8');
+ } else if ((arg instanceof Buffer) ||
+ (Object.prototype.toString.call(arg) == '[object Uint8Array]')) {
+ // Buffers in Node.js under Browserify may extend UInt8Array instead of
+ // defining a new object. We detect them here so we can write them
+ // correctly
+ this.writeI32(arg.length);
+ this.trans.write(arg);
+ } else {
+ throw new Error('writeBinary called without a string/Buffer argument: ' + arg);
+ }
+};
+
+TBinaryProtocol.prototype.readMessageBegin = function() {
+ var sz = this.readI32();
+ var type, name, seqid;
+
+ if (sz < 0) {
+ var version = sz & VERSION_MASK;
+ if (version != VERSION_1) {
+ console.log("BAD: " + version);
+ throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.BAD_VERSION, "Bad version in readMessageBegin: " + sz);
+ }
+ type = sz & TYPE_MASK;
+ name = this.readString();
+ seqid = this.readI32();
+ } else {
+ if (this.strictRead) {
+ throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.BAD_VERSION, "No protocol version header");
+ }
+ name = this.trans.read(sz);
+ type = this.readByte();
+ seqid = this.readI32();
+ }
+ return {fname: name, mtype: type, rseqid: seqid};
+};
+
+TBinaryProtocol.prototype.readMessageEnd = function() {
+};
+
+TBinaryProtocol.prototype.readStructBegin = function() {
+ return {fname: ''};
+};
+
+TBinaryProtocol.prototype.readStructEnd = function() {
+};
+
+TBinaryProtocol.prototype.readFieldBegin = function() {
+ var type = this.readByte();
+ if (type == Type.STOP) {
+ return {fname: null, ftype: type, fid: 0};
+ }
+ var id = this.readI16();
+ return {fname: null, ftype: type, fid: id};
+};
+
+TBinaryProtocol.prototype.readFieldEnd = function() {
+};
+
+TBinaryProtocol.prototype.readMapBegin = function() {
+ var ktype = this.readByte();
+ var vtype = this.readByte();
+ var size = this.readI32();
+ return {ktype: ktype, vtype: vtype, size: size};
+};
+
+TBinaryProtocol.prototype.readMapEnd = function() {
+};
+
+TBinaryProtocol.prototype.readListBegin = function() {
+ var etype = this.readByte();
+ var size = this.readI32();
+ return {etype: etype, size: size};
+};
+
+TBinaryProtocol.prototype.readListEnd = function() {
+};
+
+TBinaryProtocol.prototype.readSetBegin = function() {
+ var etype = this.readByte();
+ var size = this.readI32();
+ return {etype: etype, size: size};
+};
+
+TBinaryProtocol.prototype.readSetEnd = function() {
+};
+
+TBinaryProtocol.prototype.readBool = function() {
+ var b = this.readByte();
+ if (b === 0) {
+ return false;
+ }
+ return true;
+};
+
+TBinaryProtocol.prototype.readByte = function() {
+ return this.trans.readByte();
+};
+
+TBinaryProtocol.prototype.readI16 = function() {
+ return this.trans.readI16();
+};
+
+TBinaryProtocol.prototype.readI32 = function() {
+ return this.trans.readI32();
+};
+
+TBinaryProtocol.prototype.readI64 = function() {
+ var buff = this.trans.read(8);
+ return new Int64(buff);
+};
+
+TBinaryProtocol.prototype.readDouble = function() {
+ return this.trans.readDouble();
+};
+
+TBinaryProtocol.prototype.readBinary = function() {
+ var len = this.readI32();
+ return this.trans.read(len);
+};
+
+TBinaryProtocol.prototype.readString = function() {
+ var len = this.readI32();
+ return this.trans.readString(len);
+};
+
+TBinaryProtocol.prototype.getTransport = function() {
+ return this.trans;
+};
+
+TBinaryProtocol.prototype.skip = function(type) {
+ switch (type) {
+ case Type.STOP:
+ return;
+ case Type.BOOL:
+ this.readBool();
+ break;
+ case Type.BYTE:
+ this.readByte();
+ break;
+ case Type.I16:
+ this.readI16();
+ break;
+ case Type.I32:
+ this.readI32();
+ break;
+ case Type.I64:
+ this.readI64();
+ break;
+ case Type.DOUBLE:
+ this.readDouble();
+ break;
+ case Type.STRING:
+ this.readString();
+ break;
+ case Type.STRUCT:
+ this.readStructBegin();
+ while (true) {
+ var r = this.readFieldBegin();
+ if (r.ftype === Type.STOP) {
+ break;
+ }
+ this.skip(r.ftype);
+ this.readFieldEnd();
+ }
+ this.readStructEnd();
+ break;
+ case Type.MAP:
+ var mapBegin = this.readMapBegin();
+ for (var i = 0; i < mapBegin.size; ++i) {
+ this.skip(mapBegin.ktype);
+ this.skip(mapBegin.vtype);
+ }
+ this.readMapEnd();
+ break;
+ case Type.SET:
+ var setBegin = this.readSetBegin();
+ for (var i2 = 0; i2 < setBegin.size; ++i2) {
+ this.skip(setBegin.etype);
+ }
+ this.readSetEnd();
+ break;
+ case Type.LIST:
+ var listBegin = this.readListBegin();
+ for (var i3 = 0; i3 < listBegin.size; ++i3) {
+ this.skip(listBegin.etype);
+ }
+ this.readListEnd();
+ break;
+ default:
+ throw new Error("Invalid type: " + type);
+ }
+};
diff --git a/lib/nodejs/lib/thrift/buffered_transport.js b/lib/nodejs/lib/thrift/buffered_transport.js
new file mode 100644
index 0000000..13636b5
--- /dev/null
+++ b/lib/nodejs/lib/thrift/buffered_transport.js
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+var binary = require('./binary');
+var InputBufferUnderrunError = require('./input_buffer_underrun_error');
+
+module.exports = TBufferedTransport;
+
+function TBufferedTransport(buffer, callback) {
+ this.defaultReadBufferSize = 1024;
+ this.writeBufferSize = 512; // Soft Limit
+ this.inBuf = new Buffer(this.defaultReadBufferSize);
+ this.readCursor = 0;
+ this.writeCursor = 0; // for input buffer
+ this.outBuffers = [];
+ this.outCount = 0;
+ this.onFlush = callback;
+};
+
+TBufferedTransport.receiver = function(callback, seqid) {
+ var reader = new TBufferedTransport();
+
+ return function(data) {
+ if (reader.writeCursor + data.length > reader.inBuf.length) {
+ var buf = new Buffer(reader.writeCursor + data.length);
+ reader.inBuf.copy(buf, 0, 0, reader.writeCursor);
+ reader.inBuf = buf;
+ }
+ data.copy(reader.inBuf, reader.writeCursor, 0);
+ reader.writeCursor += data.length;
+
+ callback(reader, seqid);
+ };
+};
+
+
+TBufferedTransport.prototype.commitPosition = function(){
+ var unreadSize = this.writeCursor - this.readCursor;
+ var bufSize = (unreadSize * 2 > this.defaultReadBufferSize) ?
+ unreadSize * 2 : this.defaultReadBufferSize;
+ var buf = new Buffer(bufSize);
+ if (unreadSize > 0) {
+ this.inBuf.copy(buf, 0, this.readCursor, this.writeCursor);
+ }
+ this.readCursor = 0;
+ this.writeCursor = unreadSize;
+ this.inBuf = buf;
+};
+
+TBufferedTransport.prototype.rollbackPosition = function(){
+ this.readCursor = 0;
+}
+
+ // TODO: Implement open/close support
+TBufferedTransport.prototype.isOpen = function() {
+ return true;
+};
+
+TBufferedTransport.prototype.open = function() {
+};
+
+TBufferedTransport.prototype.close = function() {
+};
+
+ // Set the seqid of the message in the client
+ // So that callbacks can be found
+TBufferedTransport.prototype.setCurrSeqId = function(seqid) {
+ this._seqid = seqid;
+};
+
+TBufferedTransport.prototype.ensureAvailable = function(len) {
+ if (this.readCursor + len > this.writeCursor) {
+ throw new InputBufferUnderrunError();
+ }
+};
+
+TBufferedTransport.prototype.read = function(len) {
+ this.ensureAvailable(len);
+ var buf = new Buffer(len);
+ this.inBuf.copy(buf, 0, this.readCursor, this.readCursor + len);
+ this.readCursor += len;
+ return buf;
+};
+
+TBufferedTransport.prototype.readByte = function() {
+ this.ensureAvailable(1);
+ return binary.readByte(this.inBuf[this.readCursor++]);
+};
+
+TBufferedTransport.prototype.readI16 = function() {
+ this.ensureAvailable(2);
+ var i16 = binary.readI16(this.inBuf, this.readCursor);
+ this.readCursor += 2;
+ return i16;
+};
+
+TBufferedTransport.prototype.readI32 = function() {
+ this.ensureAvailable(4);
+ var i32 = binary.readI32(this.inBuf, this.readCursor);
+ this.readCursor += 4;
+ return i32;
+};
+
+TBufferedTransport.prototype.readDouble = function() {
+ this.ensureAvailable(8);
+ var d = binary.readDouble(this.inBuf, this.readCursor);
+ this.readCursor += 8;
+ return d;
+};
+
+TBufferedTransport.prototype.readString = function(len) {
+ this.ensureAvailable(len);
+ var str = this.inBuf.toString('utf8', this.readCursor, this.readCursor + len);
+ this.readCursor += len;
+ return str;
+};
+
+TBufferedTransport.prototype.borrow = function() {
+ var obj = {buf: this.inBuf, readIndex: this.readCursor, writeIndex: this.writeCursor};
+ return obj;
+};
+
+TBufferedTransport.prototype.consume = function(bytesConsumed) {
+ this.readCursor += bytesConsumed;
+};
+
+TBufferedTransport.prototype.write = function(buf) {
+ if (typeof(buf) === "string") {
+ buf = new Buffer(buf, 'utf8');
+ }
+ this.outBuffers.push(buf);
+ this.outCount += buf.length;
+};
+
+TBufferedTransport.prototype.flush = function() {
+ // If the seqid of the callback is available pass it to the onFlush
+ // Then remove the current seqid
+ var seqid = this._seqid;
+ this._seqid = null;
+
+ if (this.outCount < 1) {
+ return;
+ }
+
+ var msg = new Buffer(this.outCount),
+ pos = 0;
+ this.outBuffers.forEach(function(buf) {
+ buf.copy(msg, pos, 0);
+ pos += buf.length;
+ });
+
+ if (this.onFlush) {
+ // Passing seqid through this call to get it to the connection
+ this.onFlush(msg, seqid);
+ }
+
+ this.outBuffers = [];
+ this.outCount = 0;
+}
diff --git a/lib/nodejs/lib/thrift/compact_protocol.js b/lib/nodejs/lib/thrift/compact_protocol.js
new file mode 100644
index 0000000..45d62f4
--- /dev/null
+++ b/lib/nodejs/lib/thrift/compact_protocol.js
@@ -0,0 +1,907 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+var log = require('./log');
+var Int64 = require('node-int64');
+var Thrift = require('./thrift');
+var Type = Thrift.Type;
+
+module.exports = TCompactProtocol;
+
+var POW_8 = Math.pow(2, 8);
+var POW_24 = Math.pow(2, 24);
+var POW_32 = Math.pow(2, 32);
+var POW_40 = Math.pow(2, 40);
+var POW_48 = Math.pow(2, 48);
+var POW_52 = Math.pow(2, 52);
+var POW_1022 = Math.pow(2, 1022);
+
+/**
+ * Constructor Function for the Compact Protocol.
+ * @constructor
+ * @param {object} [trans] - The underlying transport to read/write.
+ * @classdesc The Apache Thrift Protocol layer performs serialization
+ * of base types, the compact protocol serializes data in binary
+ * form with minimal space used for scalar values.
+ */
+function TCompactProtocol(trans) {
+ this.trans = trans;
+ this.lastField_ = [];
+ this.lastFieldId_ = 0;
+ this.string_limit_ = 0;
+ this.string_buf_ = null;
+ this.string_buf_size_ = 0;
+ this.container_limit_ = 0;
+ this.booleanField_ = {
+ name: null,
+ hasBoolValue: false
+ };
+ this.boolValue_ = {
+ hasBoolValue: false,
+ boolValue: false
+ };
+};
+
+
+//
+// Compact Protocol Constants
+//
+
+/**
+ * Compact Protocol ID number.
+ * @readonly
+ * @const {number} PROTOCOL_ID
+ */
+TCompactProtocol.PROTOCOL_ID = -126; //1000 0010
+
+/**
+ * Compact Protocol version number.
+ * @readonly
+ * @const {number} VERSION_N
+ */
+TCompactProtocol.VERSION_N = 1;
+
+/**
+ * Compact Protocol version mask for combining protocol version and message type in one byte.
+ * @readonly
+ * @const {number} VERSION_MASK
+ */
+TCompactProtocol.VERSION_MASK = 0x1f; //0001 1111
+
+/**
+ * Compact Protocol message type mask for combining protocol version and message type in one byte.
+ * @readonly
+ * @const {number} TYPE_MASK
+ */
+TCompactProtocol.TYPE_MASK = -32; //1110 0000
+
+/**
+ * Compact Protocol message type bits for ensuring message type bit size.
+ * @readonly
+ * @const {number} TYPE_BITS
+ */
+TCompactProtocol.TYPE_BITS = 7; //0000 0111
+
+/**
+ * Compact Protocol message type shift amount for combining protocol version and message type in one byte.
+ * @readonly
+ * @const {number} TYPE_SHIFT_AMOUNT
+ */
+TCompactProtocol.TYPE_SHIFT_AMOUNT = 5;
+
+/**
+ * Compact Protocol type IDs used to keep type data within one nibble.
+ * @readonly
+ * @property {number} CT_STOP - End of a set of fields.
+ * @property {number} CT_BOOLEAN_TRUE - Flag for Boolean field with true value (packed field and value).
+ * @property {number} CT_BOOLEAN_FALSE - Flag for Boolean field with false value (packed field and value).
+ * @property {number} CT_BYTE - Signed 8 bit integer.
+ * @property {number} CT_I16 - Signed 16 bit integer.
+ * @property {number} CT_I32 - Signed 32 bit integer.
+ * @property {number} CT_I64 - Signed 64 bit integer (2^53 max in JavaScript).
+ * @property {number} CT_DOUBLE - 64 bit IEEE 854 floating point.
+ * @property {number} CT_BINARY - Array of bytes (used for strings also).
+ * @property {number} CT_LIST - A collection type (unordered).
+ * @property {number} CT_SET - A collection type (unordered and without repeated values).
+ * @property {number} CT_MAP - A collection type (map/associative-array/dictionary).
+ * @property {number} CT_STRUCT - A multifield type.
+ */
+TCompactProtocol.Types = {
+ CT_STOP: 0x00,
+ CT_BOOLEAN_TRUE: 0x01,
+ CT_BOOLEAN_FALSE: 0x02,
+ CT_BYTE: 0x03,
+ CT_I16: 0x04,
+ CT_I32: 0x05,
+ CT_I64: 0x06,
+ CT_DOUBLE: 0x07,
+ CT_BINARY: 0x08,
+ CT_LIST: 0x09,
+ CT_SET: 0x0A,
+ CT_MAP: 0x0B,
+ CT_STRUCT: 0x0C
+};
+
+/**
+ * Array mapping Compact type IDs to standard Thrift type IDs.
+ * @readonly
+ */
+TCompactProtocol.TTypeToCType = [
+ TCompactProtocol.Types.CT_STOP, // T_STOP
+ 0, // unused
+ TCompactProtocol.Types.CT_BOOLEAN_TRUE, // T_BOOL
+ TCompactProtocol.Types.CT_BYTE, // T_BYTE
+ TCompactProtocol.Types.CT_DOUBLE, // T_DOUBLE
+ 0, // unused
+ TCompactProtocol.Types.CT_I16, // T_I16
+ 0, // unused
+ TCompactProtocol.Types.CT_I32, // T_I32
+ 0, // unused
+ TCompactProtocol.Types.CT_I64, // T_I64
+ TCompactProtocol.Types.CT_BINARY, // T_STRING
+ TCompactProtocol.Types.CT_STRUCT, // T_STRUCT
+ TCompactProtocol.Types.CT_MAP, // T_MAP
+ TCompactProtocol.Types.CT_SET, // T_SET
+ TCompactProtocol.Types.CT_LIST, // T_LIST
+];
+
+
+//
+// Compact Protocol Utilities
+//
+
+/**
+ * Returns the underlying transport layer.
+ * @return {object} The underlying transport layer.
+ */TCompactProtocol.prototype.getTransport = function() {
+ return this.trans;
+};
+
+/**
+ * Lookup a Compact Protocol Type value for a given Thrift Type value.
+ * N.B. Used only internally.
+ * @param {number} ttype - Thrift type value
+ * @returns {number} Compact protocol type value
+ */
+TCompactProtocol.prototype.getCompactType = function(ttype) {
+ return TCompactProtocol.TTypeToCType[ttype];
+};
+
+/**
+ * Lookup a Thrift Type value for a given Compact Protocol Type value.
+ * N.B. Used only internally.
+ * @param {number} type - Compact Protocol type value
+ * @returns {number} Thrift Type value
+ */
+TCompactProtocol.prototype.getTType = function(type) {
+ switch (type) {
+ case Type.STOP:
+ return Type.STOP;
+ case TCompactProtocol.Types.CT_BOOLEAN_FALSE:
+ case TCompactProtocol.Types.CT_BOOLEAN_TRUE:
+ return Type.BOOL;
+ case TCompactProtocol.Types.CT_BYTE:
+ return Type.BYTE;
+ case TCompactProtocol.Types.CT_I16:
+ return Type.I16;
+ case TCompactProtocol.Types.CT_I32:
+ return Type.I32;
+ case TCompactProtocol.Types.CT_I64:
+ return Type.I64;
+ case TCompactProtocol.Types.CT_DOUBLE:
+ return Type.DOUBLE;
+ case TCompactProtocol.Types.CT_BINARY:
+ return Type.STRING;
+ case TCompactProtocol.Types.CT_LIST:
+ return Type.LIST;
+ case TCompactProtocol.Types.CT_SET:
+ return Type.SET;
+ case TCompactProtocol.Types.CT_MAP:
+ return Type.MAP;
+ case TCompactProtocol.Types.CT_STRUCT:
+ return Type.STRUCT;
+ default:
+ throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.INVALID_DATA, "Unknown type: " + type);
+ }
+ return Type.STOP;
+};
+
+
+//
+// Compact Protocol write operations
+//
+
+/**
+ * Send any buffered bytes to the end point.
+ */
+TCompactProtocol.prototype.flush = function() {
+ return this.trans.flush();
+};
+
+/**
+ * Writes an RPC message header
+ * @param {string} name - The method name for the message.
+ * @param {number} type - The type of message (CALL, REPLY, EXCEPTION, ONEWAY).
+ * @param {number} seqid - The call sequence number (if any).
+ */
+TCompactProtocol.prototype.writeMessageBegin = function(name, type, seqid) {
+ this.writeByte(TCompactProtocol.PROTOCOL_ID);
+ this.writeByte((TCompactProtocol.VERSION_N & TCompactProtocol.VERSION_MASK) |
+ ((type << TCompactProtocol.TYPE_SHIFT_AMOUNT) & TCompactProtocol.TYPE_MASK));
+ this.writeVarint32(seqid);
+ this.writeString(name);
+
+ // Record client seqid to find callback again
+ if (this._seqid) {
+ // TODO better logging log warning
+ log.warning('SeqId already set', { 'name': name });
+ } else {
+ this._seqid = seqid;
+ this.trans.setCurrSeqId(seqid);
+ }
+};
+
+TCompactProtocol.prototype.writeMessageEnd = function() {
+};
+
+TCompactProtocol.prototype.writeStructBegin = function(name) {
+ this.lastField_.push(this.lastFieldId_);
+ this.lastFieldId_ = 0;
+};
+
+TCompactProtocol.prototype.writeStructEnd = function() {
+ this.lastFieldId_ = this.lastField_.pop();
+};
+
+/**
+ * Writes a struct field header
+ * @param {string} name - The field name (not written with the compact protocol).
+ * @param {number} type - The field data type (a normal Thrift field Type).
+ * @param {number} id - The IDL field Id.
+ */
+TCompactProtocol.prototype.writeFieldBegin = function(name, type, id) {
+ if (type != Type.BOOL) {
+ return this.writeFieldBeginInternal(name, type, id, -1);
+ }
+
+ this.booleanField_.name = name;
+ this.booleanField_.fieldType = type;
+ this.booleanField_.fieldId = id;
+};
+
+TCompactProtocol.prototype.writeFieldEnd = function() {
+};
+
+TCompactProtocol.prototype.writeFieldStop = function() {
+ this.writeByte(TCompactProtocol.Types.CT_STOP);
+};
+
+/**
+ * Writes a map collection header
+ * @param {number} keyType - The Thrift type of the map keys.
+ * @param {number} valType - The Thrift type of the map values.
+ * @param {number} size - The number of k/v pairs in the map.
+ */
+TCompactProtocol.prototype.writeMapBegin = function(keyType, valType, size) {
+ if (size === 0) {
+ this.writeByte(0);
+ } else {
+ this.writeVarint32(size);
+ this.writeByte(this.getCompactType(keyType) << 4 | this.getCompactType(valType));
+ }
+};
+
+TCompactProtocol.prototype.writeMapEnd = function() {
+};
+
+/**
+ * Writes a list collection header
+ * @param {number} elemType - The Thrift type of the list elements.
+ * @param {number} size - The number of elements in the list.
+ */
+TCompactProtocol.prototype.writeListBegin = function(elemType, size) {
+ this.writeCollectionBegin(elemType, size);
+};
+
+TCompactProtocol.prototype.writeListEnd = function() {
+};
+
+/**
+ * Writes a set collection header
+ * @param {number} elemType - The Thrift type of the set elements.
+ * @param {number} size - The number of elements in the set.
+ */
+TCompactProtocol.prototype.writeSetBegin = function(elemType, size) {
+ this.writeCollectionBegin(elemType, size);
+};
+
+TCompactProtocol.prototype.writeSetEnd = function() {
+};
+
+TCompactProtocol.prototype.writeBool = function(value) {
+ if (this.booleanField_.name !== null) {
+ // we haven't written the field header yet
+ this.writeFieldBeginInternal(this.booleanField_.name,
+ this.booleanField_.fieldType,
+ this.booleanField_.fieldId,
+ (value ? TCompactProtocol.Types.CT_BOOLEAN_TRUE
+ : TCompactProtocol.Types.CT_BOOLEAN_FALSE));
+ this.booleanField_.name = null;
+ } else {
+ // we're not part of a field, so just write the value
+ this.writeByte((value ? TCompactProtocol.Types.CT_BOOLEAN_TRUE
+ : TCompactProtocol.Types.CT_BOOLEAN_FALSE));
+ }
+};
+
+TCompactProtocol.prototype.writeByte = function(b) {
+ this.trans.write(new Buffer([b]));
+};
+
+TCompactProtocol.prototype.writeI16 = function(i16) {
+ this.writeVarint32(this.i32ToZigzag(i16));
+};
+
+TCompactProtocol.prototype.writeI32 = function(i32) {
+ this.writeVarint32(this.i32ToZigzag(i32));
+};
+
+TCompactProtocol.prototype.writeI64 = function(i64) {
+ this.writeVarint64(this.i64ToZigzag(i64));
+};
+
+// Little-endian, unlike TBinaryProtocol
+TCompactProtocol.prototype.writeDouble = function(v) {
+ var buff = new Buffer(8);
+ var m, e, c;
+
+ buff[7] = (v < 0 ? 0x80 : 0x00);
+
+ v = Math.abs(v);
+ if (v !== v) {
+ // NaN, use QNaN IEEE format
+ m = 2251799813685248;
+ e = 2047;
+ } else if (v === Infinity) {
+ m = 0;
+ e = 2047;
+ } else {
+ e = Math.floor(Math.log(v) / Math.LN2);
+ c = Math.pow(2, -e);
+ if (v * c < 1) {
+ e--;
+ c *= 2;
+ }
+
+ if (e + 1023 >= 2047)
+ {
+ // Overflow
+ m = 0;
+ e = 2047;
+ }
+ else if (e + 1023 >= 1)
+ {
+ // Normalized - term order matters, as Math.pow(2, 52-e) and v*Math.pow(2, 52) can overflow
+ m = (v*c-1) * POW_52;
+ e += 1023;
+ }
+ else
+ {
+ // Denormalized - also catches the '0' case, somewhat by chance
+ m = (v * POW_1022) * POW_52;
+ e = 0;
+ }
+ }
+
+ buff[6] = (e << 4) & 0xf0;
+ buff[7] |= (e >> 4) & 0x7f;
+
+ buff[0] = m & 0xff;
+ m = Math.floor(m / POW_8);
+ buff[1] = m & 0xff;
+ m = Math.floor(m / POW_8);
+ buff[2] = m & 0xff;
+ m = Math.floor(m / POW_8);
+ buff[3] = m & 0xff;
+ m >>= 8;
+ buff[4] = m & 0xff;
+ m >>= 8;
+ buff[5] = m & 0xff;
+ m >>= 8;
+ buff[6] |= m & 0x0f;
+
+ this.trans.write(buff);
+};
+
+TCompactProtocol.prototype.writeString = function(arg) {
+ this.writeBinary(arg);
+};
+
+TCompactProtocol.prototype.writeBinary = function(arg) {
+ if (typeof arg === 'string') {
+ this.writeVarint32(Buffer.byteLength(arg, 'utf8')) ;
+ this.trans.write(arg, 'utf8');
+ } else if (arg instanceof Buffer) {
+ this.writeVarint32(arg.length);
+ this.trans.write(arg);
+ } else {
+ throw new Error('writeString/writeBinary called without a string/Buffer argument: ' + arg);
+ }
+};
+
+
+//
+// Compact Protocol internal write methods
+//
+
+TCompactProtocol.prototype.writeFieldBeginInternal = function(name,
+ fieldType,
+ fieldId,
+ typeOverride) {
+ //If there's a type override, use that.
+ var typeToWrite = (typeOverride == -1 ? this.getCompactType(fieldType) : typeOverride);
+ //Check if we can delta encode the field id
+ if (fieldId > this.lastFieldId_ && fieldId - this.lastFieldId_ <= 15) {
+ //Include the type delta with the field ID
+ this.writeByte((fieldId - this.lastFieldId_) << 4 | typeToWrite);
+ } else {
+ //Write separate type and ID values
+ this.writeByte(typeToWrite);
+ this.writeI16(fieldId);
+ }
+ this.lastFieldId_ = fieldId;
+};
+
+TCompactProtocol.prototype.writeCollectionBegin = function(elemType, size) {
+ if (size <= 14) {
+ //Combine size and type in one byte if possible
+ this.writeByte(size << 4 | this.getCompactType(elemType));
+ } else {
+ this.writeByte(0xf0 | this.getCompactType(elemType));
+ this.writeVarint32(size);
+ }
+};
+
+/**
+ * Write an i32 as a varint. Results in 1-5 bytes on the wire.
+ */
+TCompactProtocol.prototype.writeVarint32 = function(n) {
+ var buf = new Buffer(5);
+ var wsize = 0;
+ while (true) {
+ if ((n & ~0x7F) === 0) {
+ buf[wsize++] = n;
+ break;
+ } else {
+ buf[wsize++] = ((n & 0x7F) | 0x80);
+ n = n >>> 7;
+ }
+ }
+ var wbuf = new Buffer(wsize);
+ buf.copy(wbuf,0,0,wsize);
+ this.trans.write(wbuf);
+};
+
+/**
+ * Write an i64 as a varint. Results in 1-10 bytes on the wire.
+ * N.B. node-int64 is always big endian
+ */
+TCompactProtocol.prototype.writeVarint64 = function(n) {
+ if (typeof n === "number"){
+ n = new Int64(n);
+ }
+ if (! (n instanceof Int64)) {
+ throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.INVALID_DATA, "Expected Int64 or Number, found: " + n);
+ }
+
+ var buf = new Buffer(10);
+ var wsize = 0;
+ var hi = n.buffer.readUInt32BE(0, true);
+ var lo = n.buffer.readUInt32BE(4, true);
+ var mask = 0;
+ while (true) {
+ if (((lo & ~0x7F) === 0) && (hi === 0)) {
+ buf[wsize++] = lo;
+ break;
+ } else {
+ buf[wsize++] = ((lo & 0x7F) | 0x80);
+ mask = hi << 25;
+ lo = lo >>> 7;
+ hi = hi >>> 7;
+ lo = lo | mask;
+ }
+ }
+ var wbuf = new Buffer(wsize);
+ buf.copy(wbuf,0,0,wsize);
+ this.trans.write(wbuf);
+};
+
+/**
+ * Convert l into a zigzag long. This allows negative numbers to be
+ * represented compactly as a varint.
+ */
+TCompactProtocol.prototype.i64ToZigzag = function(l) {
+ if (typeof l === 'string') {
+ l = new Int64(parseInt(l, 10));
+ } else if (typeof l === 'number') {
+ l = new Int64(l);
+ }
+ if (! (l instanceof Int64)) {
+ throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.INVALID_DATA, "Expected Int64 or Number, found: " + l);
+ }
+ var hi = l.buffer.readUInt32BE(0, true);
+ var lo = l.buffer.readUInt32BE(4, true);
+ var sign = hi >>> 31;
+ hi = ((hi << 1) | (lo >>> 31)) ^ ((!!sign) ? 0xFFFFFFFF : 0);
+ lo = (lo << 1) ^ ((!!sign) ? 0xFFFFFFFF : 0);
+ return new Int64(hi, lo);
+};
+
+/**
+ * Convert n into a zigzag int. This allows negative numbers to be
+ * represented compactly as a varint.
+ */
+TCompactProtocol.prototype.i32ToZigzag = function(n) {
+ return (n << 1) ^ ((n & 0x80000000) ? 0xFFFFFFFF : 0);
+};
+
+
+//
+// Compact Protocol read operations
+//
+
+TCompactProtocol.prototype.readMessageBegin = function() {
+ //Read protocol ID
+ var protocolId = this.trans.readByte();
+ if (protocolId != TCompactProtocol.PROTOCOL_ID) {
+ throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.BAD_VERSION, "Bad protocol identifier " + protocolId);
+ }
+
+ //Read Version and Type
+ var versionAndType = this.trans.readByte();
+ var version = (versionAndType & TCompactProtocol.VERSION_MASK);
+ if (version != TCompactProtocol.VERSION_N) {
+ throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.BAD_VERSION, "Bad protocol version " + version);
+ }
+ var type = ((versionAndType >> TCompactProtocol.TYPE_SHIFT_AMOUNT) & TCompactProtocol.TYPE_BITS);
+
+ //Read SeqId
+ var seqid = this.readVarint32();
+
+ //Read name
+ var name = this.readString();
+
+ return {fname: name, mtype: type, rseqid: seqid};
+};
+
+TCompactProtocol.prototype.readMessageEnd = function() {
+};
+
+TCompactProtocol.prototype.readStructBegin = function() {
+ this.lastField_.push(this.lastFieldId_);
+ this.lastFieldId_ = 0;
+ return {fname: ''};
+};
+
+TCompactProtocol.prototype.readStructEnd = function() {
+ this.lastFieldId_ = this.lastField_.pop();
+};
+
+TCompactProtocol.prototype.readFieldBegin = function() {
+ var fieldId = 0;
+ var b = this.trans.readByte(b);
+ var type = (b & 0x0f);
+
+ if (type == TCompactProtocol.Types.CT_STOP) {
+ return {fname: null, ftype: Thrift.Type.STOP, fid: 0};
+ }
+
+ //Mask off the 4 MSB of the type header to check for field id delta.
+ var modifier = ((b & 0x000000f0) >>> 4);
+ if (modifier === 0) {
+ //If not a delta read the field id.
+ fieldId = this.readI16();
+ } else {
+ //Recover the field id from the delta
+ fieldId = (this.lastFieldId_ + modifier);
+ }
+ var fieldType = this.getTType(type);
+
+ //Boolean are encoded with the type
+ if (type == TCompactProtocol.Types.CT_BOOLEAN_TRUE ||
+ type == TCompactProtocol.Types.CT_BOOLEAN_FALSE) {
+ this.boolValue_.hasBoolValue = true;
+ this.boolValue_.boolValue =
+ (type == TCompactProtocol.Types.CT_BOOLEAN_TRUE ? true : false);
+ }
+
+ //Save the new field for the next delta computation.
+ this.lastFieldId_ = fieldId;
+ return {fname: null, ftype: fieldType, fid: fieldId};
+};
+
+TCompactProtocol.prototype.readFieldEnd = function() {
+};
+
+TCompactProtocol.prototype.readMapBegin = function() {
+ var msize = this.readVarint32();
+ if (msize < 0) {
+ throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.NEGATIVE_SIZE, "Negative map size");
+ }
+
+ var kvType = 0;
+ if (msize !== 0) {
+ kvType = this.trans.readByte();
+ }
+
+ var keyType = this.getTType((kvType & 0xf0) >>> 4);
+ var valType = this.getTType(kvType & 0xf);
+ return {ktype: keyType, vtype: valType, size: msize};
+};
+
+TCompactProtocol.prototype.readMapEnd = function() {
+};
+
+TCompactProtocol.prototype.readListBegin = function() {
+ var size_and_type = this.trans.readByte();
+
+ var lsize = (size_and_type >>> 4) & 0x0000000f;
+ if (lsize == 15) {
+ lsize = this.readVarint32();
+ }
+
+ if (lsize < 0) {
+ throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.NEGATIVE_SIZE, "Negative list size");
+ }
+
+ var elemType = this.getTType(size_and_type & 0x0000000f);
+
+ return {etype: elemType, size: lsize};
+};
+
+TCompactProtocol.prototype.readListEnd = function() {
+};
+
+TCompactProtocol.prototype.readSetBegin = function() {
+ return this.readListBegin();
+};
+
+TCompactProtocol.prototype.readSetEnd = function() {
+};
+
+TCompactProtocol.prototype.readBool = function() {
+ var value = false;
+ var rsize = 0;
+ if (this.boolValue_.hasBoolValue === true) {
+ value = this.boolValue_.boolValue;
+ this.boolValue_.hasBoolValue = false;
+ } else {
+ var res = this.trans.readByte();
+ rsize = res.rsize;
+ value = (res.value == TCompactProtocol.Types.CT_BOOLEAN_TRUE);
+ }
+ return value;
+};
+
+TCompactProtocol.prototype.readByte = function() {
+ return this.trans.readByte();
+};
+
+TCompactProtocol.prototype.readI16 = function() {
+ return this.readI32();
+};
+
+TCompactProtocol.prototype.readI32 = function() {
+ return this.zigzagToI32(this.readVarint32());
+};
+
+TCompactProtocol.prototype.readI64 = function() {
+ return this.zigzagToI64(this.readVarint64());
+};
+
+// Little-endian, unlike TBinaryProtocol
+TCompactProtocol.prototype.readDouble = function() {
+ var buff = this.trans.read(8);
+ var off = 0;
+
+ var signed = buff[off + 7] & 0x80;
+ var e = (buff[off+6] & 0xF0) >> 4;
+ e += (buff[off+7] & 0x7F) << 4;
+
+ var m = buff[off];
+ m += buff[off+1] << 8;
+ m += buff[off+2] << 16;
+ m += buff[off+3] * POW_24;
+ m += buff[off+4] * POW_32;
+ m += buff[off+5] * POW_40;
+ m += (buff[off+6] & 0x0F) * POW_48;
+
+ switch (e) {
+ case 0:
+ e = -1022;
+ break;
+ case 2047:
+ return m ? NaN : (signed ? -Infinity : Infinity);
+ default:
+ m += POW_52;
+ e -= 1023;
+ }
+
+ if (signed) {
+ m *= -1;
+ }
+
+ return m * Math.pow(2, e - 52);
+};
+
+TCompactProtocol.prototype.readBinary = function() {
+ var size = this.readVarint32();
+ // Catch empty string case
+ if (size === 0) {
+ return "";
+ }
+
+ // Catch error cases
+ if (size < 0) {
+ throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.NEGATIVE_SIZE, "Negative binary/string size");
+ }
+ var value = this.trans.readString(size);
+
+ return value;
+};
+
+TCompactProtocol.prototype.readString = function() {
+ return this.readBinary();
+};
+
+
+//
+// Compact Protocol internal read operations
+//
+
+/**
+ * Read an i32 from the wire as a varint. The MSB of each byte is set
+ * if there is another byte to follow. This can read up to 5 bytes.
+ */
+TCompactProtocol.prototype.readVarint32 = function() {
+ return this.readVarint64();
+};
+
+/**
+ * Read an i64 from the wire as a proper varint. The MSB of each byte is set
+ * if there is another byte to follow. This can read up to 10 bytes.
+ */
+TCompactProtocol.prototype.readVarint64 = function() {
+ var rsize = 0;
+ var lo = 0;
+ var hi = 0;
+ var shift = 0;
+ while (true) {
+ var b = this.trans.readByte();
+ rsize ++;
+ if (shift <= 25) {
+ lo = lo | ((b & 0x7f) << shift);
+ } else if (25 < shift && shift < 32) {
+ lo = lo | ((b & 0x7f) << shift);
+ hi = hi | ((b & 0x7f) >>> (32-shift));
+ } else {
+ hi = hi | ((b & 0x7f) << (shift-32));
+ }
+ shift += 7;
+ if (!(b & 0x80)) {
+ break;
+ }
+ if (rsize >= 10) {
+ throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.INVALID_DATA, "Variable-length int over 10 bytes.");
+ }
+ }
+ var i64 = new Int64(hi, lo);
+ return i64.toNumber();
+};
+
+/**
+ * Convert from zigzag int to int.
+ */
+TCompactProtocol.prototype.zigzagToI32 = function(n) {
+ return (n >>> 1) ^ (-1 * (n & 1));
+};
+
+/**
+ * Convert from zigzag long to long.
+ */
+TCompactProtocol.prototype.zigzagToI64 = function(n) {
+ var zz = new Int64(n);
+ var hi = zz.buffer.readUInt32BE(0, true);
+ var lo = zz.buffer.readUInt32BE(4, true);
+
+ var neg = new Int64(hi & 0, lo & 1);
+ neg._2scomp();
+ var hi_neg = neg.buffer.readUInt32BE(0, true);
+ var lo_neg = neg.buffer.readUInt32BE(4, true);
+
+ var hi_lo = (hi << 31);
+ hi = (hi >>> 1) ^ (hi_neg);
+ lo = ((lo >>> 1) | hi_lo) ^ (lo_neg);
+ var i64 = new Int64(hi, lo);
+ return i64.toNumber();
+};
+
+TCompactProtocol.prototype.skip = function(type) {
+ switch (type) {
+ case Type.STOP:
+ return;
+ case Type.BOOL:
+ this.readBool();
+ break;
+ case Type.BYTE:
+ this.readByte();
+ break;
+ case Type.I16:
+ this.readI16();
+ break;
+ case Type.I32:
+ this.readI32();
+ break;
+ case Type.I64:
+ this.readI64();
+ break;
+ case Type.DOUBLE:
+ this.readDouble();
+ break;
+ case Type.STRING:
+ this.readString();
+ break;
+ case Type.STRUCT:
+ this.readStructBegin();
+ while (true) {
+ var r = this.readFieldBegin();
+ if (r.ftype === Type.STOP) {
+ break;
+ }
+ this.skip(r.ftype);
+ this.readFieldEnd();
+ }
+ this.readStructEnd();
+ break;
+ case Type.MAP:
+ var mapBegin = this.readMapBegin();
+ for (var i = 0; i < mapBegin.size; ++i) {
+ this.skip(mapBegin.ktype);
+ this.skip(mapBegin.vtype);
+ }
+ this.readMapEnd();
+ break;
+ case Type.SET:
+ var setBegin = this.readSetBegin();
+ for (var i2 = 0; i2 < setBegin.size; ++i2) {
+ this.skip(setBegin.etype);
+ }
+ this.readSetEnd();
+ break;
+ case Type.LIST:
+ var listBegin = this.readListBegin();
+ for (var i3 = 0; i3 < listBegin.size; ++i3) {
+ this.skip(listBegin.etype);
+ }
+ this.readListEnd();
+ break;
+ default:
+ throw new Error("Invalid type: " + type);
+ }
+};
diff --git a/lib/nodejs/lib/thrift/connection.js b/lib/nodejs/lib/thrift/connection.js
index aa985df..e836e30 100644
--- a/lib/nodejs/lib/thrift/connection.js
+++ b/lib/nodejs/lib/thrift/connection.js
@@ -16,43 +16,47 @@
* specific language governing permissions and limitations
* under the License.
*/
-var util = require('util'),
- EventEmitter = require("events").EventEmitter,
- net = require('net'),
- tls = require('tls'),
- ttransport = require('./transport'),
- tprotocol = require('./protocol'),
- thrift = require('./thrift');
+var util = require('util');
+var EventEmitter = require("events").EventEmitter;
+var net = require('net');
+var tls = require('tls');
+var thrift = require('./thrift');
+
+var TBufferedTransport = require('./buffered_transport');
+var TBinaryProtocol = require('./binary_protocol');
+var InputBufferUnderrunError = require('./input_buffer_underrun_error');
+
+var createClient = require('./create_client');
var binary = require('./binary');
var Connection = exports.Connection = function(stream, options) {
var self = this;
EventEmitter.call(this);
-
+
this.seqId2Service = {};
this.connection = stream;
this.options = options || {};
- this.transport = this.options.transport || ttransport.TBufferedTransport;
- this.protocol = this.options.protocol || tprotocol.TBinaryProtocol;
+ this.transport = this.options.transport || TBufferedTransport;
+ this.protocol = this.options.protocol || TBinaryProtocol;
this.offline_queue = [];
this.connected = false;
this._debug = this.options.debug || false;
- if (this.options.max_attempts &&
- !isNaN(this.options.max_attempts) &&
+ if (this.options.max_attempts &&
+ !isNaN(this.options.max_attempts) &&
this.options.max_attempts > 0) {
this.max_attempts = +this.options.max_attempts;
}
this.retry_max_delay = null;
- if (this.options.retry_max_delay !== undefined &&
- !isNaN(this.options.retry_max_delay) &&
+ if (this.options.retry_max_delay !== undefined &&
+ !isNaN(this.options.retry_max_delay) &&
this.options.retry_max_delay > 0) {
this.retry_max_delay = this.options.retry_max_delay;
}
this.connect_timeout = false;
- if (this.options.connect_timeout &&
- !isNaN(this.options.connect_timeout) &&
+ if (this.options.connect_timeout &&
+ !isNaN(this.options.connect_timeout) &&
this.options.connect_timeout > 0) {
this.connect_timeout = +this.options.connect_timeout;
}
@@ -94,7 +98,7 @@
this.connection.addListener("error", function(err) {
// Only emit the error if no-one else is listening on the connection
// or if someone is listening on us
- if (self.connection.listeners('error').length === 1 ||
+ if (self.connection.listeners('error').length === 1 ||
self.listeners('error').length > 0) {
self.emit("error", err);
}
@@ -123,12 +127,12 @@
// in seqId2Service. If the SeqId is found in the hash we need to
// lookup the appropriate client for this call.
// The connection.client object is a single client object when not
- // multiplexing, when using multiplexing it is a service name keyed
+ // multiplexing, when using multiplexing it is a service name keyed
// hash of client objects.
//NOTE: The 2 way interdependencies between protocols, transports,
// connections and clients in the Node.js implementation are irregular
- // and make the implementation difficult to extend and maintain. We
- // should bring this stuff inline with typical thrift I/O stack
+ // and make the implementation difficult to extend and maintain. We
+ // should bring this stuff inline with typical thrift I/O stack
// operation soon.
// --ra
var service_name = self.seqId2Service[header.rseqid];
@@ -137,7 +141,7 @@
delete self.seqId2Service[header.rseqid];
}
/*jshint -W083 */
- client._reqs[dummy_seqid] = function(err, success){
+ client._reqs[dummy_seqid] = function(err, success){
transport_with_data.commitPosition();
var callback = client._reqs[header.rseqid];
@@ -152,14 +156,14 @@
client['recv_' + header.fname](message, header.mtype, dummy_seqid);
} else {
delete client._reqs[dummy_seqid];
- self.emit("error",
+ self.emit("error",
new thrift.TApplicationException(thrift.TApplicationExceptionType.WRONG_METHOD_NAME,
"Received a response to an unknown RPC function"));
}
}
}
catch (e) {
- if (e instanceof ttransport.InputBufferUnderrunError) {
+ if (e instanceof InputBufferUnderrunError) {
transport_with_data.rollbackPosition();
}
else {
@@ -266,19 +270,7 @@
};
-exports.createClient = function(cls, connection) {
- if (cls.Client) {
- cls = cls.Client;
- }
- var client = new cls(new connection.transport(undefined, function(buf) {
- connection.write(buf);
- }), connection.protocol);
-
- // TODO clean this up
- connection.client = client;
-
- return client;
-};
+exports.createClient = createClient;
var child_process = require('child_process');
var StdIOConnection = exports.StdIOConnection = function(command, options) {
@@ -293,8 +285,8 @@
this._debug = options.debug || false;
this.connection = child.stdin;
this.options = options || {};
- this.transport = this.options.transport || ttransport.TBufferedTransport;
- this.protocol = this.options.protocol || tprotocol.TBinaryProtocol;
+ this.transport = this.options.transport || TBufferedTransport;
+ this.protocol = this.options.protocol || TBinaryProtocol;
this.offline_queue = [];
if(this._debug === true){
@@ -344,7 +336,7 @@
client['recv_' + header.fname](message, header.mtype, dummy_seqid);
}
catch (e) {
- if (e instanceof ttransport.InputBufferUnderrunError) {
+ if (e instanceof InputBufferUnderrunError) {
transport_with_data.rollbackPosition();
}
else {
@@ -372,17 +364,4 @@
return new StdIOConnection(command,options);
};
-exports.createStdIOClient = function(cls,connection) {
- if (cls.Client) {
- cls = cls.Client;
- }
-
- var client = new cls(new connection.transport(undefined, function(buf) {
- connection.write(buf);
- }), connection.protocol);
-
- // TODO clean this up
- connection.client = client;
-
- return client;
-};
+exports.createStdIOClient = createClient;
diff --git a/lib/nodejs/lib/thrift/create_client.js b/lib/nodejs/lib/thrift/create_client.js
new file mode 100644
index 0000000..d6b77a8
--- /dev/null
+++ b/lib/nodejs/lib/thrift/create_client.js
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+module.exports = createClient;
+
+/**
+ * Creates a new client object for the specified Thrift service.
+ * @param {object} ServiceClient - The module containing the generated service client
+ * @param {Connection} Connection - The connection to use.
+ * @returns {object} The client object.
+ */
+function createClient(ServiceClient, connection) {
+ // TODO validate required options and throw otherwise
+ if (ServiceClient.Client) {
+ ServiceClient = ServiceClient.Client;
+ }
+ // TODO detangle these initialization calls
+ // creating "client" requires
+ // - new service client instance
+ //
+ // New service client instance requires
+ // - new transport instance
+ // - protocol class reference
+ //
+ // New transport instance requires
+ // - Buffer to use (or none)
+ // - Callback to call on flush
+
+ // Wrap the write method
+ var writeCb = function(buf, seqid) {
+ connection.write(buf, seqid);
+ };
+ var transport = new connection.transport(undefined, writeCb);
+ var client = new ServiceClient(transport, connection.protocol);
+ transport.client = client;
+ connection.client = client;
+ return client;
+};
diff --git a/lib/nodejs/lib/thrift/framed_transport.js b/lib/nodejs/lib/thrift/framed_transport.js
new file mode 100644
index 0000000..6947925
--- /dev/null
+++ b/lib/nodejs/lib/thrift/framed_transport.js
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+var binary = require('./binary');
+var InputBufferUnderrunError = require('./input_buffer_underrun_error');
+
+module.exports = TFramedTransport;
+
+function TFramedTransport(buffer, callback) {
+ this.inBuf = buffer || new Buffer(0);
+ this.outBuffers = [];
+ this.outCount = 0;
+ this.readPos = 0;
+ this.onFlush = callback;
+};
+
+TFramedTransport.receiver = function(callback, seqid) {
+ var residual = null;
+
+ return function(data) {
+ // Prepend any residual data from our previous read
+ if (residual) {
+ data = Buffer.concat([residual, data]);
+ residual = null;
+ }
+
+ // framed transport
+ while (data.length) {
+ if (data.length < 4) {
+ // Not enough bytes to continue, save and resume on next packet
+ residual = data;
+ return;
+ }
+ var frameSize = binary.readI32(data, 0);
+ if (data.length < 4 + frameSize) {
+ // Not enough bytes to continue, save and resume on next packet
+ residual = data;
+ return;
+ }
+
+ var frame = data.slice(4, 4 + frameSize);
+ residual = data.slice(4 + frameSize);
+
+ callback(new TFramedTransport(frame), seqid);
+
+ data = residual;
+ residual = null;
+ }
+ };
+};
+
+TFramedTransport.prototype.commitPosition = function(){},
+TFramedTransport.prototype.rollbackPosition = function(){},
+
+ // TODO: Implement open/close support
+TFramedTransport.prototype.isOpen = function() {
+ return true;
+};
+TFramedTransport.prototype.open = function() {};
+TFramedTransport.prototype.close = function() {};
+
+ // Set the seqid of the message in the client
+ // So that callbacks can be found
+TFramedTransport.prototype.setCurrSeqId = function(seqid) {
+ this._seqid = seqid;
+};
+
+TFramedTransport.prototype.ensureAvailable = function(len) {
+ if (this.readPos + len > this.inBuf.length) {
+ throw new InputBufferUnderrunError();
+ }
+};
+
+TFramedTransport.prototype.read = function(len) { // this function will be used for each frames.
+ this.ensureAvailable(len);
+ var end = this.readPos + len;
+
+ if (this.inBuf.length < end) {
+ throw new Error('read(' + len + ') failed - not enough data');
+ }
+
+ var buf = this.inBuf.slice(this.readPos, end);
+ this.readPos = end;
+ return buf;
+};
+
+TFramedTransport.prototype.readByte = function() {
+ this.ensureAvailable(1);
+ return binary.readByte(this.inBuf[this.readPos++]);
+};
+
+TFramedTransport.prototype.readI16 = function() {
+ this.ensureAvailable(2);
+ var i16 = binary.readI16(this.inBuf, this.readPos);
+ this.readPos += 2;
+ return i16;
+};
+
+TFramedTransport.prototype.readI32 = function() {
+ this.ensureAvailable(4);
+ var i32 = binary.readI32(this.inBuf, this.readPos);
+ this.readPos += 4;
+ return i32;
+};
+
+TFramedTransport.prototype.readDouble = function() {
+ this.ensureAvailable(8);
+ var d = binary.readDouble(this.inBuf, this.readPos);
+ this.readPos += 8;
+ return d;
+};
+
+TFramedTransport.prototype.readString = function(len) {
+ this.ensureAvailable(len);
+ var str = this.inBuf.toString('utf8', this.readPos, this.readPos + len);
+ this.readPos += len;
+ return str;
+};
+
+TFramedTransport.prototype.borrow = function() {
+ return {
+ buf: this.inBuf,
+ readIndex: this.readPos,
+ writeIndex: this.inBuf.length
+ };
+};
+
+TFramedTransport.prototype.consume = function(bytesConsumed) {
+ this.readPos += bytesConsumed;
+};
+
+TFramedTransport.prototype.write = function(buf, encoding) {
+ if (typeof(buf) === "string") {
+ buf = new Buffer(buf, encoding || 'utf8');
+ }
+ this.outBuffers.push(buf);
+ this.outCount += buf.length;
+};
+
+TFramedTransport.prototype.flush = function() {
+ // If the seqid of the callback is available pass it to the onFlush
+ // Then remove the current seqid
+ var seqid = this._seqid;
+ this._seqid = null;
+
+ var out = new Buffer(this.outCount),
+ pos = 0;
+ this.outBuffers.forEach(function(buf) {
+ buf.copy(out, pos, 0);
+ pos += buf.length;
+ });
+
+ if (this.onFlush) {
+ // TODO: optimize this better, allocate one buffer instead of both:
+ var msg = new Buffer(out.length + 4);
+ binary.writeI32(msg, out.length);
+ out.copy(msg, 4, 0, out.length);
+ if (this.onFlush) {
+ // Passing seqid through this call to get it to the connection
+ this.onFlush(msg, seqid);
+ }
+ }
+
+ this.outBuffers = [];
+ this.outCount = 0;
+};
diff --git a/lib/nodejs/lib/thrift/http_connection.js b/lib/nodejs/lib/thrift/http_connection.js
index b7659bc..f3fcd74 100644
--- a/lib/nodejs/lib/thrift/http_connection.js
+++ b/lib/nodejs/lib/thrift/http_connection.js
@@ -21,8 +21,12 @@
var https = require('https');
var EventEmitter = require('events').EventEmitter;
var thrift = require('./thrift');
-var ttransport = require('./transport');
-var tprotocol = require('./protocol');
+
+var TBufferedTransport = require('./buffered_transport');
+var TBinaryProtocol = require('./binary_protocol');
+var InputBufferUnderrunError = require('./input_buffer_underrun_error');
+
+var createClient = require('./create_client');
/**
* @class
@@ -30,14 +34,14 @@
* @property {string} transport - The Thrift layered transport to use (TBufferedTransport, etc).
* @property {string} protocol - The Thrift serialization protocol to use (TBinaryProtocol, etc.).
* @property {string} path - The URL path to POST to (e.g. "/", "/mySvc", "/thrift/quoteSvc", etc.).
- * @property {object} headers - A standard Node.js header hash, an object hash containing key/value
+ * @property {object} headers - A standard Node.js header hash, an object hash containing key/value
* pairs where the key is the header name string and the value is the header value string.
* @property {boolean} https - True causes the connection to use https, otherwise http is used.
* @property {object} nodeOptions - Options passed on to node.
* @example
* //Use a connection that requires ssl/tls, closes the connection after each request,
* // uses the buffered transport layer, uses the JSON protocol and directs RPC traffic
- * // to https://thrift.example.com:9090/hello
+ * // to https://thrift.example.com:9090/hello
* var thrift = require('thrift');
* var options = {
* transport: thrift.TBufferedTransport,
@@ -52,18 +56,18 @@
*/
/**
- * Initializes a Thrift HttpConnection instance (use createHttpConnection() rather than
+ * Initializes a Thrift HttpConnection instance (use createHttpConnection() rather than
* instantiating directly).
* @constructor
* @param {string} host - The host name or IP to connect to.
* @param {number} port - The TCP port to connect to.
* @param {ConnectOptions} options - The configuration options to use.
- * @throws {error} Exceptions other than ttransport.InputBufferUnderrunError are rethrown
+ * @throws {error} Exceptions other than InputBufferUnderrunError are rethrown
* @event {error} The "error" event is fired when a Node.js error event occurs during
* request or response processing, in which case the node error is passed on. An "error"
* event may also be fired when the connection can not map a response back to the
* appropriate client (an internal error), generating a TApplicationException.
- * @classdesc HttpConnection objects provide Thrift end point transport
+ * @classdesc HttpConnection objects provide Thrift end point transport
* semantics implemented over the Node.js http.request() method.
* @see {@link createHttpConnection}
*/
@@ -77,8 +81,8 @@
this.host = host;
this.port = port;
this.https = this.options.https || false;
- this.transport = this.options.transport || ttransport.TBufferedTransport;
- this.protocol = this.options.protocol || tprotocol.TBinaryProtocol;
+ this.transport = this.options.transport || TBufferedTransport;
+ this.protocol = this.options.protocol || TBinaryProtocol;
//Prepare Node.js options
this.nodeOptions = {
@@ -89,8 +93,8 @@
headers: this.options.headers || {},
responseType: this.options.responseType || null
};
- for (var attrname in this.options.nodeOptions) {
- this.nodeOptions[attrname] = this.options.nodeOptions[attrname];
+ for (var attrname in this.options.nodeOptions) {
+ this.nodeOptions[attrname] = this.options.nodeOptions[attrname];
}
/*jshint -W069 */
if (! this.nodeOptions.headers['Connection']) {
@@ -98,7 +102,7 @@
}
/*jshint +W069 */
- //The sequence map is used to map seqIDs back to the
+ //The sequence map is used to map seqIDs back to the
// calling client in multiplexed scenarios
this.seqId2Service = {};
@@ -112,13 +116,13 @@
//The Multiplexed Protocol stores a hash of seqid to service names
// in seqId2Service. If the SeqId is found in the hash we need to
// lookup the appropriate client for this call.
- // The client var is a single client object when not multiplexing,
+ // The client var is a single client object when not multiplexing,
// when using multiplexing it is a service name keyed hash of client
// objects.
//NOTE: The 2 way interdependencies between protocols, transports,
// connections and clients in the Node.js implementation are irregular
- // and make the implementation difficult to extend and maintain. We
- // should bring this stuff inline with typical thrift I/O stack
+ // and make the implementation difficult to extend and maintain. We
+ // should bring this stuff inline with typical thrift I/O stack
// operation soon.
// --ra
var service_name = self.seqId2Service[header.rseqid];
@@ -127,7 +131,7 @@
delete self.seqId2Service[header.rseqid];
}
/*jshint -W083 */
- client._reqs[dummy_seqid] = function(err, success){
+ client._reqs[dummy_seqid] = function(err, success){
transport_with_data.commitPosition();
var clientCallback = client._reqs[header.rseqid];
delete client._reqs[header.rseqid];
@@ -150,15 +154,15 @@
}
}
catch (e) {
- if (e instanceof ttransport.InputBufferUnderrunError) {
+ if (e instanceof InputBufferUnderrunError) {
transport_with_data.rollbackPosition();
} else {
throw e;
}
}
}
-
-
+
+
//Response handler
//////////////////////////////////////////////////
this.responseCallback = function(response) {
@@ -180,14 +184,14 @@
} else {
data.push(chunk);
}
- dataLen += chunk.length;
+ dataLen += chunk.length;
});
response.on('end', function(){
- var buf = new Buffer(dataLen);
- for (var i=0, len=data.length, pos=0; i<len; i++) {
- data[i].copy(buf, pos);
- pos += data[i].length;
+ var buf = new Buffer(dataLen);
+ for (var i=0, len=data.length, pos=0; i<len; i++) {
+ data[i].copy(buf, pos);
+ pos += data[i].length;
}
//Get the receiver function for the transport and
// call it with the buffer
@@ -201,8 +205,8 @@
* Writes Thrift message data to the connection
* @param {Buffer} data - A Node.js Buffer containing the data to write
* @returns {void} No return value.
- * @event {error} the "error" event is raised upon request failure passing the
- * Node.js error object to the listener.
+ * @event {error} the "error" event is raised upon request failure passing the
+ * Node.js error object to the listener.
*/
HttpConnection.prototype.write = function(data) {
var self = this;
@@ -212,7 +216,7 @@
http.request(self.nodeOptions, self.responseCallback);
req.on('error', function(err) {
self.emit("error", err);
- });
+ });
req.write(data);
req.end();
};
@@ -230,20 +234,5 @@
return new HttpConnection(host, port, options);
};
-/**
- * Creates a new client object for the specified Thrift service.
- * @param {object} cls - The module containing the service client
- * @param {HttpConnection} httpConnection - The connection to use.
- * @returns {object} The client object.
- * @see {@link createHttpConnection}
- */
-exports.createHttpClient = function(cls, httpConnection) {
- if (cls.Client) {
- cls = cls.Client;
- }
- httpConnection.client =
- new cls(new httpConnection.transport(undefined, function(buf) {httpConnection.write(buf);}),
- httpConnection.protocol);
- return httpConnection.client;
-};
+exports.createHttpClient = createClient
diff --git a/lib/nodejs/lib/thrift/index.js b/lib/nodejs/lib/thrift/index.js
index 9b53dd0..e313dbb 100644
--- a/lib/nodejs/lib/thrift/index.js
+++ b/lib/nodejs/lib/thrift/index.js
@@ -55,8 +55,8 @@
* Export transport and protocol so they can be used outside of a
* cassandra/server context
*/
-exports.TFramedTransport = require('./transport').TFramedTransport;
-exports.TBufferedTransport = require('./transport').TBufferedTransport;
-exports.TBinaryProtocol = require('./protocol').TBinaryProtocol;
-exports.TJSONProtocol = require('./protocol').TJSONProtocol;
-exports.TCompactProtocol = require('./protocol').TCompactProtocol;
+exports.TFramedTransport = require('./framed_transport');
+exports.TBufferedTransport = require('./buffered_transport');
+exports.TBinaryProtocol = require('./binary_protocol');
+exports.TJSONProtocol = require('./json_protocol');
+exports.TCompactProtocol = require('./compact_protocol');
diff --git a/lib/nodejs/lib/thrift/input_buffer_underrun_error.js b/lib/nodejs/lib/thrift/input_buffer_underrun_error.js
new file mode 100644
index 0000000..4d4237b
--- /dev/null
+++ b/lib/nodejs/lib/thrift/input_buffer_underrun_error.js
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+var util = require("util");
+
+module.exports = InputBufferUnderrunError;
+
+function InputBufferUnderrunError(message) {
+ Error.call(this, message);
+};
+
+util.inherits(InputBufferUnderrunError, Error);
diff --git a/lib/nodejs/lib/thrift/json_protocol.js b/lib/nodejs/lib/thrift/json_protocol.js
new file mode 100644
index 0000000..f4678f5
--- /dev/null
+++ b/lib/nodejs/lib/thrift/json_protocol.js
@@ -0,0 +1,706 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+var log = require('./log');
+var Int64 = require('node-int64');
+var InputBufferUnderrunError = require('./transport').InputBufferUnderrunError;
+var Thrift = require('./thrift');
+var Type = Thrift.Type;
+var util = require("util");
+
+var InputBufferUnderrunError = require('./input_buffer_underrun_error');
+
+module.exports = TJSONProtocol;
+
+/**
+ * Initializes a Thrift JSON protocol instance.
+ * @constructor
+ * @param {Thrift.Transport} trans - The transport to serialize to/from.
+ * @classdesc Apache Thrift Protocols perform serialization which enables cross
+ * language RPC. The Protocol type is the JavaScript browser implementation
+ * of the Apache Thrift TJSONProtocol.
+ * @example
+ * var protocol = new Thrift.Protocol(transport);
+ */
+function TJSONProtocol(trans) {
+ this.trans = trans;
+};
+
+/**
+ * Thrift IDL type Id to string mapping.
+ * @readonly
+ * @see {@link Thrift.Type}
+ */
+TJSONProtocol.Type = {};
+TJSONProtocol.Type[Type.BOOL] = '"tf"';
+TJSONProtocol.Type[Type.BYTE] = '"i8"';
+TJSONProtocol.Type[Type.I16] = '"i16"';
+TJSONProtocol.Type[Type.I32] = '"i32"';
+TJSONProtocol.Type[Type.I64] = '"i64"';
+TJSONProtocol.Type[Type.DOUBLE] = '"dbl"';
+TJSONProtocol.Type[Type.STRUCT] = '"rec"';
+TJSONProtocol.Type[Type.STRING] = '"str"';
+TJSONProtocol.Type[Type.MAP] = '"map"';
+TJSONProtocol.Type[Type.LIST] = '"lst"';
+TJSONProtocol.Type[Type.SET] = '"set"';
+
+/**
+ * Thrift IDL type string to Id mapping.
+ * @readonly
+ * @see {@link Thrift.Type}
+ */
+TJSONProtocol.RType = {};
+TJSONProtocol.RType.tf = Type.BOOL;
+TJSONProtocol.RType.i8 = Type.BYTE;
+TJSONProtocol.RType.i16 = Type.I16;
+TJSONProtocol.RType.i32 = Type.I32;
+TJSONProtocol.RType.i64 = Type.I64;
+TJSONProtocol.RType.dbl = Type.DOUBLE;
+TJSONProtocol.RType.rec = Type.STRUCT;
+TJSONProtocol.RType.str = Type.STRING;
+TJSONProtocol.RType.map = Type.MAP;
+TJSONProtocol.RType.lst = Type.LIST;
+TJSONProtocol.RType.set = Type.SET;
+
+/**
+ * The TJSONProtocol version number.
+ * @readonly
+ * @const {number} Version
+ * @memberof Thrift.Protocol
+ */
+TJSONProtocol.Version = 1;
+
+TJSONProtocol.prototype.flush = function() {
+ return this.trans.flush();
+};
+
+/**
+ * Serializes the beginning of a Thrift RPC message.
+ * @param {string} name - The service method to call.
+ * @param {Thrift.MessageType} messageType - The type of method call.
+ * @param {number} seqid - The sequence number of this call (always 0 in Apache Thrift).
+ */
+TJSONProtocol.prototype.writeMessageBegin = function(name, messageType, seqid) {
+ this.tstack = [];
+ this.tpos = [];
+
+ this.tstack.push([TJSONProtocol.Version, '"' + name + '"', messageType, seqid]);
+};
+
+/**
+ * Serializes the end of a Thrift RPC message.
+ */
+TJSONProtocol.prototype.writeMessageEnd = function() {
+ var obj = this.tstack.pop();
+
+ this.wobj = this.tstack.pop();
+ this.wobj.push(obj);
+
+ this.wbuf = '[' + this.wobj.join(',') + ']';
+
+ this.trans.write(this.wbuf);
+};
+
+/**
+ * Serializes the beginning of a struct.
+ * @param {string} name - The name of the struct.
+ */
+TJSONProtocol.prototype.writeStructBegin = function(name) {
+ this.tpos.push(this.tstack.length);
+ this.tstack.push({});
+};
+
+/**
+ * Serializes the end of a struct.
+ */
+TJSONProtocol.prototype.writeStructEnd = function() {
+ var p = this.tpos.pop();
+ var struct = this.tstack[p];
+ var str = '{';
+ var first = true;
+ for (var key in struct) {
+ if (first) {
+ first = false;
+ } else {
+ str += ',';
+ }
+
+ str += key + ':' + struct[key];
+ }
+
+ str += '}';
+ this.tstack[p] = str;
+};
+
+/**
+ * Serializes the beginning of a struct field.
+ * @param {string} name - The name of the field.
+ * @param {Thrift.Protocol.Type} fieldType - The data type of the field.
+ * @param {number} fieldId - The field's unique identifier.
+ */
+TJSONProtocol.prototype.writeFieldBegin = function(name, fieldType, fieldId) {
+ this.tpos.push(this.tstack.length);
+ this.tstack.push({ 'fieldId': '"' +
+ fieldId + '"', 'fieldType': TJSONProtocol.Type[fieldType]
+ });
+};
+
+/**
+ * Serializes the end of a field.
+ */
+TJSONProtocol.prototype.writeFieldEnd = function() {
+ var value = this.tstack.pop();
+ var fieldInfo = this.tstack.pop();
+
+ if (':' + value === ":[object Object]") {
+ this.tstack[this.tstack.length - 1][fieldInfo.fieldId] = '{' +
+ fieldInfo.fieldType + ':' + JSON.stringify(value) + '}';
+ } else {
+ this.tstack[this.tstack.length - 1][fieldInfo.fieldId] = '{' +
+ fieldInfo.fieldType + ':' + value + '}';
+ }
+ this.tpos.pop();
+};
+
+/**
+ * Serializes the end of the set of fields for a struct.
+ */
+TJSONProtocol.prototype.writeFieldStop = function() {
+};
+
+/**
+ * Serializes the beginning of a map collection.
+ * @param {Thrift.Type} keyType - The data type of the key.
+ * @param {Thrift.Type} valType - The data type of the value.
+ * @param {number} [size] - The number of elements in the map (ignored).
+ */
+TJSONProtocol.prototype.writeMapBegin = function(keyType, valType, size) {
+ //size is invalid, we'll set it on end.
+ this.tpos.push(this.tstack.length);
+ this.tstack.push([TJSONProtocol.Type[keyType], TJSONProtocol.Type[valType], 0]);
+};
+
+/**
+ * Serializes the end of a map.
+ */
+TJSONProtocol.prototype.writeMapEnd = function() {
+ var p = this.tpos.pop();
+
+ if (p == this.tstack.length) {
+ return;
+ }
+
+ if ((this.tstack.length - p - 1) % 2 !== 0) {
+ this.tstack.push('');
+ }
+
+ var size = (this.tstack.length - p - 1) / 2;
+
+ this.tstack[p][this.tstack[p].length - 1] = size;
+
+ var map = '}';
+ var first = true;
+ while (this.tstack.length > p + 1) {
+ var v = this.tstack.pop();
+ var k = this.tstack.pop();
+ if (first) {
+ first = false;
+ } else {
+ map = ',' + map;
+ }
+
+ if (! isNaN(k)) { k = '"' + k + '"'; } //json "keys" need to be strings
+ map = k + ':' + v + map;
+ }
+ map = '{' + map;
+
+ this.tstack[p].push(map);
+ this.tstack[p] = '[' + this.tstack[p].join(',') + ']';
+};
+
+/**
+ * Serializes the beginning of a list collection.
+ * @param {Thrift.Type} elemType - The data type of the elements.
+ * @param {number} size - The number of elements in the list.
+ */
+TJSONProtocol.prototype.writeListBegin = function(elemType, size) {
+ this.tpos.push(this.tstack.length);
+ this.tstack.push([TJSONProtocol.Type[elemType], size]);
+};
+
+/**
+ * Serializes the end of a list.
+ */
+TJSONProtocol.prototype.writeListEnd = function() {
+ var p = this.tpos.pop();
+
+ while (this.tstack.length > p + 1) {
+ var tmpVal = this.tstack[p + 1];
+ this.tstack.splice(p + 1, 1);
+ this.tstack[p].push(tmpVal);
+ }
+
+ this.tstack[p] = '[' + this.tstack[p].join(',') + ']';
+};
+
+/**
+ * Serializes the beginning of a set collection.
+ * @param {Thrift.Type} elemType - The data type of the elements.
+ * @param {number} size - The number of elements in the list.
+ */
+TJSONProtocol.prototype.writeSetBegin = function(elemType, size) {
+ this.tpos.push(this.tstack.length);
+ this.tstack.push([TJSONProtocol.Type[elemType], size]);
+};
+
+/**
+ * Serializes the end of a set.
+ */
+TJSONProtocol.prototype.writeSetEnd = function() {
+ var p = this.tpos.pop();
+
+ while (this.tstack.length > p + 1) {
+ var tmpVal = this.tstack[p + 1];
+ this.tstack.splice(p + 1, 1);
+ this.tstack[p].push(tmpVal);
+ }
+
+ this.tstack[p] = '[' + this.tstack[p].join(',') + ']';
+};
+
+/** Serializes a boolean */
+TJSONProtocol.prototype.writeBool = function(bool) {
+ this.tstack.push(bool ? 1 : 0);
+};
+
+/** Serializes a number */
+TJSONProtocol.prototype.writeByte = function(byte) {
+ this.tstack.push(byte);
+};
+
+/** Serializes a number */
+TJSONProtocol.prototype.writeI16 = function(i16) {
+ this.tstack.push(i16);
+};
+
+/** Serializes a number */
+TJSONProtocol.prototype.writeI32 = function(i32) {
+ this.tstack.push(i32);
+};
+
+/** Serializes a number */
+TJSONProtocol.prototype.writeI64 = function(i64) {
+ this.tstack.push(i64);
+};
+
+/** Serializes a number */
+TJSONProtocol.prototype.writeDouble = function(dub) {
+ this.tstack.push(dub);
+};
+
+/** Serializes a string */
+TJSONProtocol.prototype.writeString = function(str) {
+ // We do not encode uri components for wire transfer:
+ if (str === null) {
+ this.tstack.push(null);
+ } else {
+ // concat may be slower than building a byte buffer
+ var escapedString = '';
+ for (var i = 0; i < str.length; i++) {
+ var ch = str.charAt(i); // a single double quote: "
+ if (ch === '\"') {
+ escapedString += '\\\"'; // write out as: \"
+ } else if (ch === '\\') { // a single backslash: \
+ escapedString += '\\\\'; // write out as: \\
+ /* Currently escaped forward slashes break TJSONProtocol.
+ * As it stands, we can simply pass forward slashes into
+ * our strings across the wire without being escaped.
+ * I think this is the protocol's bug, not thrift.js
+ * } else if(ch === '/') { // a single forward slash: /
+ * escapedString += '\\/'; // write out as \/
+ * }
+ */
+ } else if (ch === '\b') { // a single backspace: invisible
+ escapedString += '\\b'; // write out as: \b"
+ } else if (ch === '\f') { // a single formfeed: invisible
+ escapedString += '\\f'; // write out as: \f"
+ } else if (ch === '\n') { // a single newline: invisible
+ escapedString += '\\n'; // write out as: \n"
+ } else if (ch === '\r') { // a single return: invisible
+ escapedString += '\\r'; // write out as: \r"
+ } else if (ch === '\t') { // a single tab: invisible
+ escapedString += '\\t'; // write out as: \t"
+ } else {
+ escapedString += ch; // Else it need not be escaped
+ }
+ }
+ this.tstack.push('"' + escapedString + '"');
+ }
+};
+
+/** Serializes a string */
+TJSONProtocol.prototype.writeBinary = function(arg) {
+ this.writeString(arg);
+};
+
+/**
+ * @class
+ * @name AnonReadMessageBeginReturn
+ * @property {string} fname - The name of the service method.
+ * @property {Thrift.MessageType} mtype - The type of message call.
+ * @property {number} rseqid - The sequence number of the message (0 in Thrift RPC).
+ */
+/**
+ * Deserializes the beginning of a message.
+ * @returns {AnonReadMessageBeginReturn}
+ */
+TJSONProtocol.prototype.readMessageBegin = function() {
+ this.rstack = [];
+ this.rpos = [];
+
+ //Borrow the inbound transport buffer and ensure data is present/consistent
+ var transBuf = this.trans.borrow();
+ if (transBuf.readIndex >= transBuf.writeIndex) {
+ throw new InputBufferUnderrunError();
+ }
+ var cursor = transBuf.readIndex;
+
+ if (transBuf.buf[cursor] !== 0x5B) { //[
+ throw new Error("Malformed JSON input, no opening bracket");
+ }
+
+ //Parse a single message (there may be several in the buffer)
+ // TODO: Handle characters using multiple code units
+ cursor++;
+ var openBracketCount = 1;
+ var inString = false;
+ for (; cursor < transBuf.writeIndex; cursor++) {
+ var chr = transBuf.buf[cursor];
+ //we use hexa charcode here because data[i] returns an int and not a char
+ if (inString) {
+ if (chr === 0x22) { //"
+ inString = false;
+ } else if (chr === 0x5C) { //\
+ //escaped character, skip
+ cursor += 1;
+ }
+ } else {
+ if (chr === 0x5B) { //[
+ openBracketCount += 1;
+ } else if (chr === 0x5D) { //]
+ openBracketCount -= 1;
+ if (openBracketCount === 0) {
+ //end of json message detected
+ break;
+ }
+ } else if (chr === 0x22) { //"
+ inString = true;
+ }
+ }
+ }
+
+ if (openBracketCount !== 0) {
+ throw new Error("Malformed JSON input, mismatched backets");
+ }
+
+ //Reconstitute the JSON object and conume the necessary bytes
+ this.robj = JSON.parse(transBuf.buf.slice(transBuf.readIndex, cursor+1));
+ this.trans.consume(cursor + 1 - transBuf.readIndex);
+
+ //Verify the protocol version
+ var version = this.robj.shift();
+ if (version != TJSONProtocol.Version) {
+ throw 'Wrong thrift protocol version: ' + version;
+ }
+
+ //Objectify the thrift message {name/type/sequence-number} for return
+ // and then save the JSON object in rstack
+ var r = {};
+ r.fname = this.robj.shift();
+ r.mtype = this.robj.shift();
+ r.rseqid = this.robj.shift();
+ this.rstack.push(this.robj.shift());
+ return r;
+};
+
+/** Deserializes the end of a message. */
+TJSONProtocol.prototype.readMessageEnd = function() {
+};
+
+/**
+ * Deserializes the beginning of a struct.
+ * @param {string} [name] - The name of the struct (ignored)
+ * @returns {object} - An object with an empty string fname property
+ */
+TJSONProtocol.prototype.readStructBegin = function() {
+ var r = {};
+ r.fname = '';
+
+ //incase this is an array of structs
+ if (this.rstack[this.rstack.length - 1] instanceof Array) {
+ this.rstack.push(this.rstack[this.rstack.length - 1].shift());
+ }
+
+ return r;
+};
+
+/** Deserializes the end of a struct. */
+TJSONProtocol.prototype.readStructEnd = function() {
+ this.rstack.pop();
+};
+
+/**
+ * @class
+ * @name AnonReadFieldBeginReturn
+ * @property {string} fname - The name of the field (always '').
+ * @property {Thrift.Type} ftype - The data type of the field.
+ * @property {number} fid - The unique identifier of the field.
+ */
+/**
+ * Deserializes the beginning of a field.
+ * @returns {AnonReadFieldBeginReturn}
+ */
+TJSONProtocol.prototype.readFieldBegin = function() {
+ var r = {};
+
+ var fid = -1;
+ var ftype = Type.STOP;
+
+ //get a fieldId
+ for (var f in (this.rstack[this.rstack.length - 1])) {
+ if (f === null) {
+ continue;
+ }
+
+ fid = parseInt(f, 10);
+ this.rpos.push(this.rstack.length);
+
+ var field = this.rstack[this.rstack.length - 1][fid];
+
+ //remove so we don't see it again
+ delete this.rstack[this.rstack.length - 1][fid];
+
+ this.rstack.push(field);
+
+ break;
+ }
+
+ if (fid != -1) {
+ //should only be 1 of these but this is the only
+ //way to match a key
+ for (var i in (this.rstack[this.rstack.length - 1])) {
+ if (TJSONProtocol.RType[i] === null) {
+ continue;
+ }
+
+ ftype = TJSONProtocol.RType[i];
+ this.rstack[this.rstack.length - 1] = this.rstack[this.rstack.length - 1][i];
+ }
+ }
+
+ r.fname = '';
+ r.ftype = ftype;
+ r.fid = fid;
+
+ return r;
+};
+
+/** Deserializes the end of a field. */
+TJSONProtocol.prototype.readFieldEnd = function() {
+ var pos = this.rpos.pop();
+
+ //get back to the right place in the stack
+ while (this.rstack.length > pos) {
+ this.rstack.pop();
+ }
+};
+
+/**
+ * @class
+ * @name AnonReadMapBeginReturn
+ * @property {Thrift.Type} ktype - The data type of the key.
+ * @property {Thrift.Type} vtype - The data type of the value.
+ * @property {number} size - The number of elements in the map.
+ */
+/**
+ * Deserializes the beginning of a map.
+ * @returns {AnonReadMapBeginReturn}
+ */
+TJSONProtocol.prototype.readMapBegin = function() {
+ var map = this.rstack.pop();
+
+ var r = {};
+ r.ktype = TJSONProtocol.RType[map.shift()];
+ r.vtype = TJSONProtocol.RType[map.shift()];
+ r.size = map.shift();
+
+
+ this.rpos.push(this.rstack.length);
+ this.rstack.push(map.shift());
+
+ return r;
+};
+
+/** Deserializes the end of a map. */
+TJSONProtocol.prototype.readMapEnd = function() {
+ this.readFieldEnd();
+};
+
+/**
+ * @class
+ * @name AnonReadColBeginReturn
+ * @property {Thrift.Type} etype - The data type of the element.
+ * @property {number} size - The number of elements in the collection.
+ */
+/**
+ * Deserializes the beginning of a list.
+ * @returns {AnonReadColBeginReturn}
+ */
+TJSONProtocol.prototype.readListBegin = function() {
+ var list = this.rstack[this.rstack.length - 1];
+
+ var r = {};
+ r.etype = TJSONProtocol.RType[list.shift()];
+ r.size = list.shift();
+
+ this.rpos.push(this.rstack.length);
+ this.rstack.push(list);
+
+ return r;
+};
+
+/** Deserializes the end of a list. */
+TJSONProtocol.prototype.readListEnd = function() {
+ this.readFieldEnd();
+};
+
+/**
+ * Deserializes the beginning of a set.
+ * @returns {AnonReadColBeginReturn}
+ */
+TJSONProtocol.prototype.readSetBegin = function() {
+ return this.readListBegin();
+};
+
+/** Deserializes the end of a set. */
+TJSONProtocol.prototype.readSetEnd = function() {
+ return this.readListEnd();
+};
+
+/** Returns an object with a value property set to
+ * False unless the next number in the protocol buffer
+ * is 1, in which case the value property is True */
+TJSONProtocol.prototype.readBool = function() {
+ var r = this.readI32();
+
+ if (r !== null && r.value == '1') {
+ r.value = true;
+ } else {
+ r.value = false;
+ }
+
+ return r;
+};
+
+/** Returns the an object with a value property set to the
+ next value found in the protocol buffer */
+TJSONProtocol.prototype.readByte = function() {
+ return this.readI32();
+};
+
+/** Returns the an object with a value property set to the
+ next value found in the protocol buffer */
+TJSONProtocol.prototype.readI16 = function() {
+ return this.readI32();
+};
+
+/** Returns the an object with a value property set to the
+ next value found in the protocol buffer */
+TJSONProtocol.prototype.readI32 = function(f) {
+ if (f === undefined) {
+ f = this.rstack[this.rstack.length - 1];
+ }
+
+ var r = {};
+
+ if (f instanceof Array) {
+ if (f.length === 0) {
+ r.value = undefined;
+ } else {
+ r.value = f.shift();
+ }
+ } else if (f instanceof Object) {
+ for (var i in f) {
+ if (i === null) {
+ continue;
+ }
+ this.rstack.push(f[i]);
+ delete f[i];
+
+ r.value = i;
+ break;
+ }
+ } else {
+ r.value = f;
+ this.rstack.pop();
+ }
+
+ return r.value;
+};
+
+/** Returns the an object with a value property set to the
+ next value found in the protocol buffer */
+TJSONProtocol.prototype.readI64 = function() {
+ return new Int64(this.readI32());
+};
+
+/** Returns the an object with a value property set to the
+ next value found in the protocol buffer */
+TJSONProtocol.prototype.readDouble = function() {
+ return this.readI32();
+};
+
+/** Returns the an object with a value property set to the
+ next value found in the protocol buffer */
+TJSONProtocol.prototype.readBinary = function() {
+ return this.readString();
+};
+
+/** Returns the an object with a value property set to the
+ next value found in the protocol buffer */
+TJSONProtocol.prototype.readString = function() {
+ var r = this.readI32();
+ return r;
+};
+
+/**
+ * Returns the underlying transport.
+ * @readonly
+ * @returns {Thrift.Transport} The underlying transport.
+ */
+TJSONProtocol.prototype.getTransport = function() {
+ return this.trans;
+};
+
+/**
+ * Method to arbitrarily skip over data
+ */
+TJSONProtocol.prototype.skip = function(type) {
+ throw 'skip not supported yet';
+};
diff --git a/lib/nodejs/lib/thrift/log.js b/lib/nodejs/lib/thrift/log.js
new file mode 100644
index 0000000..0e13ea8
--- /dev/null
+++ b/lib/nodejs/lib/thrift/log.js
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+module.exports = {
+ 'info' : function logInfo() {},
+ 'warning' : function logWarning() {},
+ 'error' : function logError() {},
+ 'debug' : function logDebug() {},
+ 'trace' : function logTrace() {}
+};
diff --git a/lib/nodejs/lib/thrift/multiplexed_processor.js b/lib/nodejs/lib/thrift/multiplexed_processor.js
index fbceb79..67b62f7 100644
--- a/lib/nodejs/lib/thrift/multiplexed_processor.js
+++ b/lib/nodejs/lib/thrift/multiplexed_processor.js
@@ -18,41 +18,46 @@
*/
var Thrift = require('./thrift');
-var MultiplexedProcessor = exports.MultiplexedProcessor = function(stream, options) {
- this.services = {};
+exports.MultiplexedProcessor = MultiplexedProcessor;
+
+function MultiplexedProcessor(stream, options) {
+ this.services = {};
};
MultiplexedProcessor.prototype.registerProcessor = function(name, handler) {
- this.services[name] = handler;
+ this.services[name] = handler;
};
MultiplexedProcessor.prototype.process = function(inp, out) {
- var begin = inp.readMessageBegin();
- if (begin.mtype != Thrift.MessageType.CALL && begin.mtype != Thrift.MessageType.ONEWAY) {
- throw new Thrift.TException("TMultiplexedProcessor: Unexpected message type");
- }
+ var begin = inp.readMessageBegin();
- var p = begin.fname.split(":");
- var sname = p[0];
- var fname = p[1];
+ if (begin.mtype != Thrift.MessageType.CALL && begin.mtype != Thrift.MessageType.ONEWAY) {
+ throw new Thrift.TException('TMultiplexedProcessor: Unexpected message type');
+ }
- if (! (sname in this.services)) {
- throw new Thrift.TException("TMultiplexedProcessor: Unknown service: " + sname);
- }
+ var p = begin.fname.split(':');
+ var sname = p[0];
+ var fname = p[1];
- //construct a proxy object which stubs the readMessageBegin
- //for the service
- var inpProxy = {};
- for (var attr in inp) {
- inpProxy[attr] = inp[attr];
- }
- inpProxy.readMessageBegin = function() {
- return {
- fname: fname,
- mtype: begin.mtype,
- rseqid: begin.rseqid
- };
+ if (! (sname in this.services)) {
+ throw new Thrift.TException('TMultiplexedProcessor: Unknown service: ' + sname);
+ }
+
+ //construct a proxy object which stubs the readMessageBegin
+ //for the service
+ var inpProxy = {};
+
+ for (var attr in inp) {
+ inpProxy[attr] = inp[attr];
+ }
+
+ inpProxy.readMessageBegin = function() {
+ return {
+ fname: fname,
+ mtype: begin.mtype,
+ rseqid: begin.rseqid
};
+ };
- this.services[sname].process(inpProxy, out);
+ this.services[sname].process(inpProxy, out);
};
diff --git a/lib/nodejs/lib/thrift/multiplexed_protocol.js b/lib/nodejs/lib/thrift/multiplexed_protocol.js
index 68440af..0745a1b 100644
--- a/lib/nodejs/lib/thrift/multiplexed_protocol.js
+++ b/lib/nodejs/lib/thrift/multiplexed_protocol.js
@@ -19,49 +19,56 @@
var util = require('util');
var Thrift = require('./thrift');
-var Wrapper = exports.Wrapper = function(service_name, protocol, connection) {
+exports.Wrapper = Wrapper;
+exports.Multiplexer = Multiplexer;
- var MultiplexProtocol = function(trans, strictRead, strictWrite) {
- protocol.call(this, trans, strictRead, strictWrite);
- };
- util.inherits(MultiplexProtocol, protocol);
+function Wrapper(serviceName, protocol, connection) {
- MultiplexProtocol.prototype.writeMessageBegin = function(name, type, seqid) {
- if (type == Thrift.MessageType.CALL || type == Thrift.MessageType.ONEWAY) {
- connection.seqId2Service[seqid] = service_name;
- MultiplexProtocol.super_.prototype.writeMessageBegin.call(this,
- service_name + ":" + name,
- type,
- seqid);
- } else {
- MultiplexProtocol.super_.prototype.writeMessageBegin.call(this, name, type, seqid);
- }
- };
+ function MultiplexProtocol(trans, strictRead, strictWrite) {
+ protocol.call(this, trans, strictRead, strictWrite);
+ };
- return MultiplexProtocol;
+ util.inherits(MultiplexProtocol, protocol);
+
+ MultiplexProtocol.prototype.writeMessageBegin = function(name, type, seqid) {
+ if (type == Thrift.MessageType.CALL || type == Thrift.MessageType.ONEWAY) {
+ connection.seqId2Service[seqid] = serviceName;
+ MultiplexProtocol.super_.prototype.writeMessageBegin.call(this,
+ serviceName + ":" + name,
+ type,
+ seqid);
+ } else {
+ MultiplexProtocol.super_.prototype.writeMessageBegin.call(this, name, type, seqid);
+ }
+ };
+
+ return MultiplexProtocol;
};
-var Multiplexer = exports.Multiplexer = function() {
- this.seqid = 0;
+function Multiplexer() {
+ this.seqid = 0;
};
-Multiplexer.prototype.createClient = function(service_name, cls, connection) {
- if (cls.Client) {
- cls = cls.Client;
- }
- var self = this;
- cls.prototype.new_seqid = function() {
- self.seqid += 1;
- return self.seqid;
- };
- var client = new cls(new connection.transport(undefined, function(buf) {
- connection.write(buf);
- }), new Wrapper(service_name, connection.protocol, connection));
-
- if (typeof connection.client !== 'object') {
- connection.client = {};
- }
- connection.client[service_name] = client;
+Multiplexer.prototype.createClient = function(serviceName, ServiceClient, connection) {
+ if (ServiceClient.Client) {
+ ServiceClient = ServiceClient.Client;
+ }
+ var self = this;
+ ServiceClient.prototype.new_seqid = function() {
+ self.seqid += 1;
+ return self.seqid;
+ };
+ var writeCb = function(buf, seqid) {
+ connection.write(buf,seqid);
+ };
+ var transport = new connection.transport(undefined, writeCb);
+ var protocolWrapper = new Wrapper(serviceName, connection.protocol, connection);
+ var client = new ServiceClient(transport, protocolWrapper);
- return client;
+ if (typeof connection.client !== 'object') {
+ connection.client = {};
+ }
+ connection.client[serviceName] = client;
+
+ return client;
};
diff --git a/lib/nodejs/lib/thrift/protocol.js b/lib/nodejs/lib/thrift/protocol.js
index 6c8e8e6..a70ebe2 100644
--- a/lib/nodejs/lib/thrift/protocol.js
+++ b/lib/nodejs/lib/thrift/protocol.js
@@ -16,1728 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-var util = require('util'),
- Thrift = require('./thrift'),
- Type = Thrift.Type;
-var binary = require('./binary'),
- Int64 = require('node-int64');
-
-var InputBufferUnderrunError = require('./transport').InputBufferUnderrunError;
-
-//
-// BINARY PROTOCOL
-//
-///////////////////////////////////////////////////////////
-
-// JavaScript supports only numeric doubles, therefore even hex values are always signed.
-// The largest integer value which can be represented in JavaScript is +/-2^53.
-// Bitwise operations convert numbers to 32 bit integers but perform sign extension
-// upon assigning values back to variables.
-var VERSION_MASK = -65536, // 0xffff0000
- VERSION_1 = -2147418112, // 0x80010000
- TYPE_MASK = 0x000000ff;
-
-var POW_8 = Math.pow(2, 8);
-var POW_24 = Math.pow(2, 24);
-var POW_32 = Math.pow(2, 32);
-var POW_40 = Math.pow(2, 40);
-var POW_48 = Math.pow(2, 48);
-var POW_52 = Math.pow(2, 52);
-var POW_1022 = Math.pow(2, 1022);
-
-var TBinaryProtocol = exports.TBinaryProtocol = function(trans, strictRead, strictWrite) {
- this.trans = trans;
- this.strictRead = (strictRead !== undefined ? strictRead : false);
- this.strictWrite = (strictWrite !== undefined ? strictWrite : true);
-};
-
-TBinaryProtocol.prototype.flush = function() {
- return this.trans.flush();
-};
-
-TBinaryProtocol.prototype.writeMessageBegin = function(name, type, seqid) {
- if (this.strictWrite) {
- this.writeI32(VERSION_1 | type);
- this.writeString(name);
- this.writeI32(seqid);
- } else {
- this.writeString(name);
- this.writeByte(type);
- this.writeI32(seqid);
- }
-};
-
-TBinaryProtocol.prototype.writeMessageEnd = function() {
-};
-
-TBinaryProtocol.prototype.writeStructBegin = function(name) {
-};
-
-TBinaryProtocol.prototype.writeStructEnd = function() {
-};
-
-TBinaryProtocol.prototype.writeFieldBegin = function(name, type, id) {
- this.writeByte(type);
- this.writeI16(id);
-};
-
-TBinaryProtocol.prototype.writeFieldEnd = function() {
-};
-
-TBinaryProtocol.prototype.writeFieldStop = function() {
- this.writeByte(Type.STOP);
-};
-
-TBinaryProtocol.prototype.writeMapBegin = function(ktype, vtype, size) {
- this.writeByte(ktype);
- this.writeByte(vtype);
- this.writeI32(size);
-};
-
-TBinaryProtocol.prototype.writeMapEnd = function() {
-};
-
-TBinaryProtocol.prototype.writeListBegin = function(etype, size) {
- this.writeByte(etype);
- this.writeI32(size);
-};
-
-TBinaryProtocol.prototype.writeListEnd = function() {
-};
-
-TBinaryProtocol.prototype.writeSetBegin = function(etype, size) {
- this.writeByte(etype);
- this.writeI32(size);
-};
-
-TBinaryProtocol.prototype.writeSetEnd = function() {
-};
-
-TBinaryProtocol.prototype.writeBool = function(bool) {
- if (bool) {
- this.writeByte(1);
- } else {
- this.writeByte(0);
- }
-};
-
-TBinaryProtocol.prototype.writeByte = function(b) {
- this.trans.write(new Buffer([b]));
-};
-
-TBinaryProtocol.prototype.writeI16 = function(i16) {
- this.trans.write(binary.writeI16(new Buffer(2), i16));
-};
-
-TBinaryProtocol.prototype.writeI32 = function(i32) {
- this.trans.write(binary.writeI32(new Buffer(4), i32));
-};
-
-TBinaryProtocol.prototype.writeI64 = function(i64) {
- if (i64.buffer) {
- this.trans.write(i64.buffer);
- } else {
- this.trans.write(new Int64(i64).buffer);
- }
-};
-
-TBinaryProtocol.prototype.writeDouble = function(dub) {
- this.trans.write(binary.writeDouble(new Buffer(8), dub));
-};
-
-TBinaryProtocol.prototype.writeString = function(arg) {
- if (typeof(arg) === 'string') {
- this.writeI32(Buffer.byteLength(arg, 'utf8'));
- this.trans.write(arg, 'utf8');
- } else if (arg instanceof Buffer) {
- this.writeI32(arg.length);
- this.trans.write(arg);
- } else {
- throw new Error('writeString called without a string/Buffer argument: ' + arg);
- }
-};
-
-TBinaryProtocol.prototype.writeBinary = function(arg) {
- if (typeof(arg) === 'string') {
- this.writeI32(Buffer.byteLength(arg, 'utf8'));
- this.trans.write(arg, 'utf8');
- } else if ((arg instanceof Buffer) ||
- (Object.prototype.toString.call(arg) == '[object Uint8Array]')) {
- // Buffers in Node.js under Browserify may extend UInt8Array instead of
- // defining a new object. We detect them here so we can write them
- // correctly
- this.writeI32(arg.length);
- this.trans.write(arg);
- } else {
- throw new Error('writeBinary called without a string/Buffer argument: ' + arg);
- }
-};
-
-TBinaryProtocol.prototype.readMessageBegin = function() {
- var sz = this.readI32();
- var type, name, seqid;
-
- if (sz < 0) {
- var version = sz & VERSION_MASK;
- if (version != VERSION_1) {
- console.log("BAD: " + version);
- throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.BAD_VERSION, "Bad version in readMessageBegin: " + sz);
- }
- type = sz & TYPE_MASK;
- name = this.readString();
- seqid = this.readI32();
- } else {
- if (this.strictRead) {
- throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.BAD_VERSION, "No protocol version header");
- }
- name = this.trans.read(sz);
- type = this.readByte();
- seqid = this.readI32();
- }
- return {fname: name, mtype: type, rseqid: seqid};
-};
-
-TBinaryProtocol.prototype.readMessageEnd = function() {
-};
-
-TBinaryProtocol.prototype.readStructBegin = function() {
- return {fname: ''};
-};
-
-TBinaryProtocol.prototype.readStructEnd = function() {
-};
-
-TBinaryProtocol.prototype.readFieldBegin = function() {
- var type = this.readByte();
- if (type == Type.STOP) {
- return {fname: null, ftype: type, fid: 0};
- }
- var id = this.readI16();
- return {fname: null, ftype: type, fid: id};
-};
-
-TBinaryProtocol.prototype.readFieldEnd = function() {
-};
-
-TBinaryProtocol.prototype.readMapBegin = function() {
- var ktype = this.readByte();
- var vtype = this.readByte();
- var size = this.readI32();
- return {ktype: ktype, vtype: vtype, size: size};
-};
-
-TBinaryProtocol.prototype.readMapEnd = function() {
-};
-
-TBinaryProtocol.prototype.readListBegin = function() {
- var etype = this.readByte();
- var size = this.readI32();
- return {etype: etype, size: size};
-};
-
-TBinaryProtocol.prototype.readListEnd = function() {
-};
-
-TBinaryProtocol.prototype.readSetBegin = function() {
- var etype = this.readByte();
- var size = this.readI32();
- return {etype: etype, size: size};
-};
-
-TBinaryProtocol.prototype.readSetEnd = function() {
-};
-
-TBinaryProtocol.prototype.readBool = function() {
- var b = this.readByte();
- if (b === 0) {
- return false;
- }
- return true;
-};
-
-TBinaryProtocol.prototype.readByte = function() {
- return this.trans.readByte();
-};
-
-TBinaryProtocol.prototype.readI16 = function() {
- return this.trans.readI16();
-};
-
-TBinaryProtocol.prototype.readI32 = function() {
- return this.trans.readI32();
-};
-
-TBinaryProtocol.prototype.readI64 = function() {
- var buff = this.trans.read(8);
- return new Int64(buff);
-};
-
-TBinaryProtocol.prototype.readDouble = function() {
- return this.trans.readDouble();
-};
-
-TBinaryProtocol.prototype.readBinary = function() {
- var len = this.readI32();
- return this.trans.read(len);
-};
-
-TBinaryProtocol.prototype.readString = function() {
- var len = this.readI32();
- return this.trans.readString(len);
-};
-
-TBinaryProtocol.prototype.getTransport = function() {
- return this.trans;
-};
-
-TBinaryProtocol.prototype.skip = function(type) {
- switch (type) {
- case Type.STOP:
- return;
- case Type.BOOL:
- this.readBool();
- break;
- case Type.BYTE:
- this.readByte();
- break;
- case Type.I16:
- this.readI16();
- break;
- case Type.I32:
- this.readI32();
- break;
- case Type.I64:
- this.readI64();
- break;
- case Type.DOUBLE:
- this.readDouble();
- break;
- case Type.STRING:
- this.readString();
- break;
- case Type.STRUCT:
- this.readStructBegin();
- while (true) {
- var r = this.readFieldBegin();
- if (r.ftype === Type.STOP) {
- break;
- }
- this.skip(r.ftype);
- this.readFieldEnd();
- }
- this.readStructEnd();
- break;
- case Type.MAP:
- var mapBegin = this.readMapBegin();
- for (var i = 0; i < mapBegin.size; ++i) {
- this.skip(mapBegin.ktype);
- this.skip(mapBegin.vtype);
- }
- this.readMapEnd();
- break;
- case Type.SET:
- var setBegin = this.readSetBegin();
- for (var i2 = 0; i2 < setBegin.size; ++i2) {
- this.skip(setBegin.etype);
- }
- this.readSetEnd();
- break;
- case Type.LIST:
- var listBegin = this.readListBegin();
- for (var i3 = 0; i3 < listBegin.size; ++i3) {
- this.skip(listBegin.etype);
- }
- this.readListEnd();
- break;
- default:
- throw new Error("Invalid type: " + type);
- }
-};
-
-
-//
-// COMPACT PROTOCOL
-//
-///////////////////////////////////////////////////////////
-
-/**
- * Constructor Function for the Compact Protocol.
- * @constructor
- * @param {object} [trans] - The underlying transport to read/write.
- * @classdesc The Apache Thrift Protocol layer performs serialization
- * of base types, the compact protocol serializes data in binary
- * form with minimal space used for scalar values.
- */
-var TCompactProtocol = exports.TCompactProtocol = function(trans) {
- this.trans = trans;
- this.lastField_ = [];
- this.lastFieldId_ = 0;
- this.string_limit_ = 0;
- this.string_buf_ = null;
- this.string_buf_size_ = 0;
- this.container_limit_ = 0;
- this.booleanField_ = {
- name: null,
- hasBoolValue: false
- };
- this.boolValue_ = {
- hasBoolValue: false,
- boolValue: false
- };
-};
-
-
-//
-// Compact Protocol Constants
-//
-
-/**
- * Compact Protocol ID number.
- * @readonly
- * @const {number} PROTOCOL_ID
- */
-TCompactProtocol.PROTOCOL_ID = -126; //1000 0010
-
-/**
- * Compact Protocol version number.
- * @readonly
- * @const {number} VERSION_N
- */
-TCompactProtocol.VERSION_N = 1;
-
-/**
- * Compact Protocol version mask for combining protocol version and message type in one byte.
- * @readonly
- * @const {number} VERSION_MASK
- */
-TCompactProtocol.VERSION_MASK = 0x1f; //0001 1111
-
-/**
- * Compact Protocol message type mask for combining protocol version and message type in one byte.
- * @readonly
- * @const {number} TYPE_MASK
- */
-TCompactProtocol.TYPE_MASK = -32; //1110 0000
-
-/**
- * Compact Protocol message type bits for ensuring message type bit size.
- * @readonly
- * @const {number} TYPE_BITS
- */
-TCompactProtocol.TYPE_BITS = 7; //0000 0111
-
-/**
- * Compact Protocol message type shift amount for combining protocol version and message type in one byte.
- * @readonly
- * @const {number} TYPE_SHIFT_AMOUNT
- */
-TCompactProtocol.TYPE_SHIFT_AMOUNT = 5;
-
-/**
- * Compact Protocol type IDs used to keep type data within one nibble.
- * @readonly
- * @property {number} CT_STOP - End of a set of fields.
- * @property {number} CT_BOOLEAN_TRUE - Flag for Boolean field with true value (packed field and value).
- * @property {number} CT_BOOLEAN_FALSE - Flag for Boolean field with false value (packed field and value).
- * @property {number} CT_BYTE - Signed 8 bit integer.
- * @property {number} CT_I16 - Signed 16 bit integer.
- * @property {number} CT_I32 - Signed 32 bit integer.
- * @property {number} CT_I64 - Signed 64 bit integer (2^53 max in JavaScript).
- * @property {number} CT_DOUBLE - 64 bit IEEE 854 floating point.
- * @property {number} CT_BINARY - Array of bytes (used for strings also).
- * @property {number} CT_LIST - A collection type (unordered).
- * @property {number} CT_SET - A collection type (unordered and without repeated values).
- * @property {number} CT_MAP - A collection type (map/associative-array/dictionary).
- * @property {number} CT_STRUCT - A multifield type.
- */
-TCompactProtocol.Types = {
- CT_STOP: 0x00,
- CT_BOOLEAN_TRUE: 0x01,
- CT_BOOLEAN_FALSE: 0x02,
- CT_BYTE: 0x03,
- CT_I16: 0x04,
- CT_I32: 0x05,
- CT_I64: 0x06,
- CT_DOUBLE: 0x07,
- CT_BINARY: 0x08,
- CT_LIST: 0x09,
- CT_SET: 0x0A,
- CT_MAP: 0x0B,
- CT_STRUCT: 0x0C
-};
-
-/**
- * Array mapping Compact type IDs to standard Thrift type IDs.
- * @readonly
- */
-TCompactProtocol.TTypeToCType = [
- TCompactProtocol.Types.CT_STOP, // T_STOP
- 0, // unused
- TCompactProtocol.Types.CT_BOOLEAN_TRUE, // T_BOOL
- TCompactProtocol.Types.CT_BYTE, // T_BYTE
- TCompactProtocol.Types.CT_DOUBLE, // T_DOUBLE
- 0, // unused
- TCompactProtocol.Types.CT_I16, // T_I16
- 0, // unused
- TCompactProtocol.Types.CT_I32, // T_I32
- 0, // unused
- TCompactProtocol.Types.CT_I64, // T_I64
- TCompactProtocol.Types.CT_BINARY, // T_STRING
- TCompactProtocol.Types.CT_STRUCT, // T_STRUCT
- TCompactProtocol.Types.CT_MAP, // T_MAP
- TCompactProtocol.Types.CT_SET, // T_SET
- TCompactProtocol.Types.CT_LIST, // T_LIST
-];
-
-
-//
-// Compact Protocol Utilities
-//
-
-/**
- * Returns the underlying transport layer.
- * @return {object} The underlying transport layer.
- */TCompactProtocol.prototype.getTransport = function() {
- return this.trans;
-};
-
-/**
- * Lookup a Compact Protocol Type value for a given Thrift Type value.
- * N.B. Used only internally.
- * @param {number} ttype - Thrift type value
- * @returns {number} Compact protocol type value
- */
-TCompactProtocol.prototype.getCompactType = function(ttype) {
- return TCompactProtocol.TTypeToCType[ttype];
-};
-
-/**
- * Lookup a Thrift Type value for a given Compact Protocol Type value.
- * N.B. Used only internally.
- * @param {number} type - Compact Protocol type value
- * @returns {number} Thrift Type value
- */
-TCompactProtocol.prototype.getTType = function(type) {
- switch (type) {
- case Type.STOP:
- return Type.STOP;
- case TCompactProtocol.Types.CT_BOOLEAN_FALSE:
- case TCompactProtocol.Types.CT_BOOLEAN_TRUE:
- return Type.BOOL;
- case TCompactProtocol.Types.CT_BYTE:
- return Type.BYTE;
- case TCompactProtocol.Types.CT_I16:
- return Type.I16;
- case TCompactProtocol.Types.CT_I32:
- return Type.I32;
- case TCompactProtocol.Types.CT_I64:
- return Type.I64;
- case TCompactProtocol.Types.CT_DOUBLE:
- return Type.DOUBLE;
- case TCompactProtocol.Types.CT_BINARY:
- return Type.STRING;
- case TCompactProtocol.Types.CT_LIST:
- return Type.LIST;
- case TCompactProtocol.Types.CT_SET:
- return Type.SET;
- case TCompactProtocol.Types.CT_MAP:
- return Type.MAP;
- case TCompactProtocol.Types.CT_STRUCT:
- return Type.STRUCT;
- default:
- throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.INVALID_DATA, "Unknown type: " + type);
- }
- return Type.STOP;
-};
-
-
-//
-// Compact Protocol write operations
-//
-
-/**
- * Send any buffered bytes to the end point.
- */
-TCompactProtocol.prototype.flush = function() {
- return this.trans.flush();
-};
-
-/**
- * Writes an RPC message header
- * @param {string} name - The method name for the message.
- * @param {number} type - The type of message (CALL, REPLY, EXCEPTION, ONEWAY).
- * @param {number} seqid - The call sequence number (if any).
- */
-TCompactProtocol.prototype.writeMessageBegin = function(name, type, seqid) {
- this.writeByte(TCompactProtocol.PROTOCOL_ID);
- this.writeByte((TCompactProtocol.VERSION_N & TCompactProtocol.VERSION_MASK) |
- ((type << TCompactProtocol.TYPE_SHIFT_AMOUNT) & TCompactProtocol.TYPE_MASK));
- this.writeVarint32(seqid);
- this.writeString(name);
-};
-
-TCompactProtocol.prototype.writeMessageEnd = function() {
-};
-
-TCompactProtocol.prototype.writeStructBegin = function(name) {
- this.lastField_.push(this.lastFieldId_);
- this.lastFieldId_ = 0;
-};
-
-TCompactProtocol.prototype.writeStructEnd = function() {
- this.lastFieldId_ = this.lastField_.pop();
-};
-
-/**
- * Writes a struct field header
- * @param {string} name - The field name (not written with the compact protocol).
- * @param {number} type - The field data type (a normal Thrift field Type).
- * @param {number} id - The IDL field Id.
- */
-TCompactProtocol.prototype.writeFieldBegin = function(name, type, id) {
- if (type != Type.BOOL) {
- return this.writeFieldBeginInternal(name, type, id, -1);
- }
-
- this.booleanField_.name = name;
- this.booleanField_.fieldType = type;
- this.booleanField_.fieldId = id;
-};
-
-TCompactProtocol.prototype.writeFieldEnd = function() {
-};
-
-TCompactProtocol.prototype.writeFieldStop = function() {
- this.writeByte(TCompactProtocol.Types.CT_STOP);
-};
-
-/**
- * Writes a map collection header
- * @param {number} keyType - The Thrift type of the map keys.
- * @param {number} valType - The Thrift type of the map values.
- * @param {number} size - The number of k/v pairs in the map.
- */
-TCompactProtocol.prototype.writeMapBegin = function(keyType, valType, size) {
- if (size === 0) {
- this.writeByte(0);
- } else {
- this.writeVarint32(size);
- this.writeByte(this.getCompactType(keyType) << 4 | this.getCompactType(valType));
- }
-};
-
-TCompactProtocol.prototype.writeMapEnd = function() {
-};
-
-/**
- * Writes a list collection header
- * @param {number} elemType - The Thrift type of the list elements.
- * @param {number} size - The number of elements in the list.
- */
-TCompactProtocol.prototype.writeListBegin = function(elemType, size) {
- this.writeCollectionBegin(elemType, size);
-};
-
-TCompactProtocol.prototype.writeListEnd = function() {
-};
-
-/**
- * Writes a set collection header
- * @param {number} elemType - The Thrift type of the set elements.
- * @param {number} size - The number of elements in the set.
- */
-TCompactProtocol.prototype.writeSetBegin = function(elemType, size) {
- this.writeCollectionBegin(elemType, size);
-};
-
-TCompactProtocol.prototype.writeSetEnd = function() {
-};
-
-TCompactProtocol.prototype.writeBool = function(value) {
- if (this.booleanField_.name !== null) {
- // we haven't written the field header yet
- this.writeFieldBeginInternal(this.booleanField_.name,
- this.booleanField_.fieldType,
- this.booleanField_.fieldId,
- (value ? TCompactProtocol.Types.CT_BOOLEAN_TRUE
- : TCompactProtocol.Types.CT_BOOLEAN_FALSE));
- this.booleanField_.name = null;
- } else {
- // we're not part of a field, so just write the value
- this.writeByte((value ? TCompactProtocol.Types.CT_BOOLEAN_TRUE
- : TCompactProtocol.Types.CT_BOOLEAN_FALSE));
- }
-};
-
-TCompactProtocol.prototype.writeByte = function(b) {
- this.trans.write(new Buffer([b]));
-};
-
-TCompactProtocol.prototype.writeI16 = function(i16) {
- this.writeVarint32(this.i32ToZigzag(i16));
-};
-
-TCompactProtocol.prototype.writeI32 = function(i32) {
- this.writeVarint32(this.i32ToZigzag(i32));
-};
-
-TCompactProtocol.prototype.writeI64 = function(i64) {
- this.writeVarint64(this.i64ToZigzag(i64));
-};
-
-// Little-endian, unlike TBinaryProtocol
-TCompactProtocol.prototype.writeDouble = function(v) {
- var buff = new Buffer(8);
- var m, e, c;
-
- buff[7] = (v < 0 ? 0x80 : 0x00);
-
- v = Math.abs(v);
- if (v !== v) {
- // NaN, use QNaN IEEE format
- m = 2251799813685248;
- e = 2047;
- } else if (v === Infinity) {
- m = 0;
- e = 2047;
- } else {
- e = Math.floor(Math.log(v) / Math.LN2);
- c = Math.pow(2, -e);
- if (v * c < 1) {
- e--;
- c *= 2;
- }
-
- if (e + 1023 >= 2047)
- {
- // Overflow
- m = 0;
- e = 2047;
- }
- else if (e + 1023 >= 1)
- {
- // Normalized - term order matters, as Math.pow(2, 52-e) and v*Math.pow(2, 52) can overflow
- m = (v*c-1) * POW_52;
- e += 1023;
- }
- else
- {
- // Denormalized - also catches the '0' case, somewhat by chance
- m = (v * POW_1022) * POW_52;
- e = 0;
- }
- }
-
- buff[6] = (e << 4) & 0xf0;
- buff[7] |= (e >> 4) & 0x7f;
-
- buff[0] = m & 0xff;
- m = Math.floor(m / POW_8);
- buff[1] = m & 0xff;
- m = Math.floor(m / POW_8);
- buff[2] = m & 0xff;
- m = Math.floor(m / POW_8);
- buff[3] = m & 0xff;
- m >>= 8;
- buff[4] = m & 0xff;
- m >>= 8;
- buff[5] = m & 0xff;
- m >>= 8;
- buff[6] |= m & 0x0f;
-
- this.trans.write(buff);
-};
-
-TCompactProtocol.prototype.writeString = function(arg) {
- this.writeBinary(arg);
-};
-
-TCompactProtocol.prototype.writeBinary = function(arg) {
- if (typeof arg === 'string') {
- this.writeVarint32(Buffer.byteLength(arg, 'utf8')) ;
- this.trans.write(arg, 'utf8');
- } else if (arg instanceof Buffer) {
- this.writeVarint32(arg.length);
- this.trans.write(arg);
- } else {
- throw new Error('writeString/writeBinary called without a string/Buffer argument: ' + arg);
- }
-};
-
-
-//
-// Compact Protocol internal write methods
-//
-
-TCompactProtocol.prototype.writeFieldBeginInternal = function(name,
- fieldType,
- fieldId,
- typeOverride) {
- //If there's a type override, use that.
- var typeToWrite = (typeOverride == -1 ? this.getCompactType(fieldType) : typeOverride);
- //Check if we can delta encode the field id
- if (fieldId > this.lastFieldId_ && fieldId - this.lastFieldId_ <= 15) {
- //Include the type delta with the field ID
- this.writeByte((fieldId - this.lastFieldId_) << 4 | typeToWrite);
- } else {
- //Write separate type and ID values
- this.writeByte(typeToWrite);
- this.writeI16(fieldId);
- }
- this.lastFieldId_ = fieldId;
-};
-
-TCompactProtocol.prototype.writeCollectionBegin = function(elemType, size) {
- if (size <= 14) {
- //Combine size and type in one byte if possible
- this.writeByte(size << 4 | this.getCompactType(elemType));
- } else {
- this.writeByte(0xf0 | this.getCompactType(elemType));
- this.writeVarint32(size);
- }
-};
-
-/**
- * Write an i32 as a varint. Results in 1-5 bytes on the wire.
- */
-TCompactProtocol.prototype.writeVarint32 = function(n) {
- var buf = new Buffer(5);
- var wsize = 0;
- while (true) {
- if ((n & ~0x7F) === 0) {
- buf[wsize++] = n;
- break;
- } else {
- buf[wsize++] = ((n & 0x7F) | 0x80);
- n = n >>> 7;
- }
- }
- var wbuf = new Buffer(wsize);
- buf.copy(wbuf,0,0,wsize);
- this.trans.write(wbuf);
-};
-
-/**
- * Write an i64 as a varint. Results in 1-10 bytes on the wire.
- * N.B. node-int64 is always big endian
- */
-TCompactProtocol.prototype.writeVarint64 = function(n) {
- if (typeof n === "number"){
- n = new Int64(n);
- }
- if (! (n instanceof Int64)) {
- throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.INVALID_DATA, "Expected Int64 or Number, found: " + n);
- }
-
- var buf = new Buffer(10);
- var wsize = 0;
- var hi = n.buffer.readUInt32BE(0, true);
- var lo = n.buffer.readUInt32BE(4, true);
- var mask = 0;
- while (true) {
- if (((lo & ~0x7F) === 0) && (hi === 0)) {
- buf[wsize++] = lo;
- break;
- } else {
- buf[wsize++] = ((lo & 0x7F) | 0x80);
- mask = hi << 25;
- lo = lo >>> 7;
- hi = hi >>> 7;
- lo = lo | mask;
- }
- }
- var wbuf = new Buffer(wsize);
- buf.copy(wbuf,0,0,wsize);
- this.trans.write(wbuf);
-};
-
-/**
- * Convert l into a zigzag long. This allows negative numbers to be
- * represented compactly as a varint.
- */
-TCompactProtocol.prototype.i64ToZigzag = function(l) {
- if (typeof l === 'string') {
- l = new Int64(parseInt(l, 10));
- } else if (typeof l === 'number') {
- l = new Int64(l);
- }
- if (! (l instanceof Int64)) {
- throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.INVALID_DATA, "Expected Int64 or Number, found: " + l);
- }
- var hi = l.buffer.readUInt32BE(0, true);
- var lo = l.buffer.readUInt32BE(4, true);
- var sign = hi >>> 31;
- hi = ((hi << 1) | (lo >>> 31)) ^ ((!!sign) ? 0xFFFFFFFF : 0);
- lo = (lo << 1) ^ ((!!sign) ? 0xFFFFFFFF : 0);
- return new Int64(hi, lo);
-};
-
-/**
- * Convert n into a zigzag int. This allows negative numbers to be
- * represented compactly as a varint.
- */
-TCompactProtocol.prototype.i32ToZigzag = function(n) {
- return (n << 1) ^ ((n & 0x80000000) ? 0xFFFFFFFF : 0);
-};
-
-
-//
-// Compact Protocol read operations
-//
-
-TCompactProtocol.prototype.readMessageBegin = function() {
- //Read protocol ID
- var protocolId = this.trans.readByte();
- if (protocolId != TCompactProtocol.PROTOCOL_ID) {
- throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.BAD_VERSION, "Bad protocol identifier " + protocolId);
- }
-
- //Read Version and Type
- var versionAndType = this.trans.readByte();
- var version = (versionAndType & TCompactProtocol.VERSION_MASK);
- if (version != TCompactProtocol.VERSION_N) {
- throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.BAD_VERSION, "Bad protocol version " + version);
- }
- var type = ((versionAndType >> TCompactProtocol.TYPE_SHIFT_AMOUNT) & TCompactProtocol.TYPE_BITS);
-
- //Read SeqId
- var seqid = this.readVarint32();
-
- //Read name
- var name = this.readString();
-
- return {fname: name, mtype: type, rseqid: seqid};
-};
-
-TCompactProtocol.prototype.readMessageEnd = function() {
-};
-
-TCompactProtocol.prototype.readStructBegin = function() {
- this.lastField_.push(this.lastFieldId_);
- this.lastFieldId_ = 0;
- return {fname: ''};
-};
-
-TCompactProtocol.prototype.readStructEnd = function() {
- this.lastFieldId_ = this.lastField_.pop();
-};
-
-TCompactProtocol.prototype.readFieldBegin = function() {
- var fieldId = 0;
- var b = this.trans.readByte(b);
- var type = (b & 0x0f);
-
- if (type == TCompactProtocol.Types.CT_STOP) {
- return {fname: null, ftype: Thrift.Type.STOP, fid: 0};
- }
-
- //Mask off the 4 MSB of the type header to check for field id delta.
- var modifier = ((b & 0x000000f0) >>> 4);
- if (modifier === 0) {
- //If not a delta read the field id.
- fieldId = this.readI16();
- } else {
- //Recover the field id from the delta
- fieldId = (this.lastFieldId_ + modifier);
- }
- var fieldType = this.getTType(type);
-
- //Boolean are encoded with the type
- if (type == TCompactProtocol.Types.CT_BOOLEAN_TRUE ||
- type == TCompactProtocol.Types.CT_BOOLEAN_FALSE) {
- this.boolValue_.hasBoolValue = true;
- this.boolValue_.boolValue =
- (type == TCompactProtocol.Types.CT_BOOLEAN_TRUE ? true : false);
- }
-
- //Save the new field for the next delta computation.
- this.lastFieldId_ = fieldId;
- return {fname: null, ftype: fieldType, fid: fieldId};
-};
-
-TCompactProtocol.prototype.readFieldEnd = function() {
-};
-
-TCompactProtocol.prototype.readMapBegin = function() {
- var msize = this.readVarint32();
- if (msize < 0) {
- throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.NEGATIVE_SIZE, "Negative map size");
- }
-
- var kvType = 0;
- if (msize !== 0) {
- kvType = this.trans.readByte();
- }
-
- var keyType = this.getTType((kvType & 0xf0) >>> 4);
- var valType = this.getTType(kvType & 0xf);
- return {ktype: keyType, vtype: valType, size: msize};
-};
-
-TCompactProtocol.prototype.readMapEnd = function() {
-};
-
-TCompactProtocol.prototype.readListBegin = function() {
- var size_and_type = this.trans.readByte();
-
- var lsize = (size_and_type >>> 4) & 0x0000000f;
- if (lsize == 15) {
- lsize = this.readVarint32();
- }
-
- if (lsize < 0) {
- throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.NEGATIVE_SIZE, "Negative list size");
- }
-
- var elemType = this.getTType(size_and_type & 0x0000000f);
-
- return {etype: elemType, size: lsize};
-};
-
-TCompactProtocol.prototype.readListEnd = function() {
-};
-
-TCompactProtocol.prototype.readSetBegin = function() {
- return this.readListBegin();
-};
-
-TCompactProtocol.prototype.readSetEnd = function() {
-};
-
-TCompactProtocol.prototype.readBool = function() {
- var value = false;
- var rsize = 0;
- if (this.boolValue_.hasBoolValue === true) {
- value = this.boolValue_.boolValue;
- this.boolValue_.hasBoolValue = false;
- } else {
- var res = this.trans.readByte();
- rsize = res.rsize;
- value = (res.value == TCompactProtocol.Types.CT_BOOLEAN_TRUE);
- }
- return value;
-};
-
-TCompactProtocol.prototype.readByte = function() {
- return this.trans.readByte();
-};
-
-TCompactProtocol.prototype.readI16 = function() {
- return this.readI32();
-};
-
-TCompactProtocol.prototype.readI32 = function() {
- return this.zigzagToI32(this.readVarint32());
-};
-
-TCompactProtocol.prototype.readI64 = function() {
- return this.zigzagToI64(this.readVarint64());
-};
-
-// Little-endian, unlike TBinaryProtocol
-TCompactProtocol.prototype.readDouble = function() {
- var buff = this.trans.read(8);
- var off = 0;
-
- var signed = buff[off + 7] & 0x80;
- var e = (buff[off+6] & 0xF0) >> 4;
- e += (buff[off+7] & 0x7F) << 4;
-
- var m = buff[off];
- m += buff[off+1] << 8;
- m += buff[off+2] << 16;
- m += buff[off+3] * POW_24;
- m += buff[off+4] * POW_32;
- m += buff[off+5] * POW_40;
- m += (buff[off+6] & 0x0F) * POW_48;
-
- switch (e) {
- case 0:
- e = -1022;
- break;
- case 2047:
- return m ? NaN : (signed ? -Infinity : Infinity);
- default:
- m += POW_52;
- e -= 1023;
- }
-
- if (signed) {
- m *= -1;
- }
-
- return m * Math.pow(2, e - 52);
-};
-
-TCompactProtocol.prototype.readBinary = function() {
- var size = this.readVarint32();
- // Catch empty string case
- if (size === 0) {
- return "";
- }
-
- // Catch error cases
- if (size < 0) {
- throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.NEGATIVE_SIZE, "Negative binary/string size");
- }
- var value = this.trans.readString(size);
-
- return value;
-};
-
-TCompactProtocol.prototype.readString = function() {
- return this.readBinary();
-};
-
-
-//
-// Compact Protocol internal read operations
-//
-
-/**
- * Read an i32 from the wire as a varint. The MSB of each byte is set
- * if there is another byte to follow. This can read up to 5 bytes.
- */
-TCompactProtocol.prototype.readVarint32 = function() {
- return this.readVarint64();
-};
-
-/**
- * Read an i64 from the wire as a proper varint. The MSB of each byte is set
- * if there is another byte to follow. This can read up to 10 bytes.
- */
-TCompactProtocol.prototype.readVarint64 = function() {
- var rsize = 0;
- var lo = 0;
- var hi = 0;
- var shift = 0;
- while (true) {
- var b = this.trans.readByte();
- rsize ++;
- if (shift <= 25) {
- lo = lo | ((b & 0x7f) << shift);
- } else if (25 < shift && shift < 32) {
- lo = lo | ((b & 0x7f) << shift);
- hi = hi | ((b & 0x7f) >>> (32-shift));
- } else {
- hi = hi | ((b & 0x7f) << (shift-32));
- }
- shift += 7;
- if (!(b & 0x80)) {
- break;
- }
- if (rsize >= 10) {
- throw new Thrift.TProtocolException(Thrift.TProtocolExceptionType.INVALID_DATA, "Variable-length int over 10 bytes.");
- }
- }
- var i64 = new Int64(hi, lo);
- return i64.toNumber();
-};
-
-/**
- * Convert from zigzag int to int.
- */
-TCompactProtocol.prototype.zigzagToI32 = function(n) {
- return (n >>> 1) ^ (-1 * (n & 1));
-};
-
-/**
- * Convert from zigzag long to long.
- */
-TCompactProtocol.prototype.zigzagToI64 = function(n) {
- var zz = new Int64(n);
- var hi = zz.buffer.readUInt32BE(0, true);
- var lo = zz.buffer.readUInt32BE(4, true);
-
- var neg = new Int64(hi & 0, lo & 1);
- neg._2scomp();
- var hi_neg = neg.buffer.readUInt32BE(0, true);
- var lo_neg = neg.buffer.readUInt32BE(4, true);
-
- var hi_lo = (hi << 31);
- hi = (hi >>> 1) ^ (hi_neg);
- lo = ((lo >>> 1) | hi_lo) ^ (lo_neg);
- var i64 = new Int64(hi, lo);
- return i64.toNumber();
-};
-
-TCompactProtocol.prototype.skip = function(type) {
- switch (type) {
- case Type.STOP:
- return;
- case Type.BOOL:
- this.readBool();
- break;
- case Type.BYTE:
- this.readByte();
- break;
- case Type.I16:
- this.readI16();
- break;
- case Type.I32:
- this.readI32();
- break;
- case Type.I64:
- this.readI64();
- break;
- case Type.DOUBLE:
- this.readDouble();
- break;
- case Type.STRING:
- this.readString();
- break;
- case Type.STRUCT:
- this.readStructBegin();
- while (true) {
- var r = this.readFieldBegin();
- if (r.ftype === Type.STOP) {
- break;
- }
- this.skip(r.ftype);
- this.readFieldEnd();
- }
- this.readStructEnd();
- break;
- case Type.MAP:
- var mapBegin = this.readMapBegin();
- for (var i = 0; i < mapBegin.size; ++i) {
- this.skip(mapBegin.ktype);
- this.skip(mapBegin.vtype);
- }
- this.readMapEnd();
- break;
- case Type.SET:
- var setBegin = this.readSetBegin();
- for (var i2 = 0; i2 < setBegin.size; ++i2) {
- this.skip(setBegin.etype);
- }
- this.readSetEnd();
- break;
- case Type.LIST:
- var listBegin = this.readListBegin();
- for (var i3 = 0; i3 < listBegin.size; ++i3) {
- this.skip(listBegin.etype);
- }
- this.readListEnd();
- break;
- default:
- throw new Error("Invalid type: " + type);
- }
-};
-
-
-//
-// JSON PROTOCOL
-//
-///////////////////////////////////////////////////////////
-
-var TJSONProtocol = exports.TJSONProtocol = function(trans) {
- this.trans = trans;
-};
-
-TJSONProtocol.Type = {};
-TJSONProtocol.Type[Thrift.Type.BOOL] = '"tf"';
-TJSONProtocol.Type[Thrift.Type.BYTE] = '"i8"';
-TJSONProtocol.Type[Thrift.Type.I16] = '"i16"';
-TJSONProtocol.Type[Thrift.Type.I32] = '"i32"';
-TJSONProtocol.Type[Thrift.Type.I64] = '"i64"';
-TJSONProtocol.Type[Thrift.Type.DOUBLE] = '"dbl"';
-TJSONProtocol.Type[Thrift.Type.STRUCT] = '"rec"';
-TJSONProtocol.Type[Thrift.Type.STRING] = '"str"';
-TJSONProtocol.Type[Thrift.Type.MAP] = '"map"';
-TJSONProtocol.Type[Thrift.Type.LIST] = '"lst"';
-TJSONProtocol.Type[Thrift.Type.SET] = '"set"';
-
-
-TJSONProtocol.RType = {};
-TJSONProtocol.RType.tf = Thrift.Type.BOOL;
-TJSONProtocol.RType.i8 = Thrift.Type.BYTE;
-TJSONProtocol.RType.i16 = Thrift.Type.I16;
-TJSONProtocol.RType.i32 = Thrift.Type.I32;
-TJSONProtocol.RType.i64 = Thrift.Type.I64;
-TJSONProtocol.RType.dbl = Thrift.Type.DOUBLE;
-TJSONProtocol.RType.rec = Thrift.Type.STRUCT;
-TJSONProtocol.RType.str = Thrift.Type.STRING;
-TJSONProtocol.RType.map = Thrift.Type.MAP;
-TJSONProtocol.RType.lst = Thrift.Type.LIST;
-TJSONProtocol.RType.set = Thrift.Type.SET;
-
-TJSONProtocol.Version = 1;
-
-TJSONProtocol.prototype.flush = function() {
- return this.trans.flush();
-};
-
-TJSONProtocol.prototype.writeMessageBegin = function(name, messageType, seqid) {
- this.tstack = [];
- this.tpos = [];
-
- this.tstack.push([TJSONProtocol.Version, '"' + name + '"', messageType, seqid]);
-};
-
-TJSONProtocol.prototype.writeMessageEnd = function() {
- var obj = this.tstack.pop();
-
- this.wobj = this.tstack.pop();
- this.wobj.push(obj);
-
- this.wbuf = '[' + this.wobj.join(',') + ']';
-
- this.trans.write(this.wbuf);
-};
-
-TJSONProtocol.prototype.writeStructBegin = function(name) {
- this.tpos.push(this.tstack.length);
- this.tstack.push({});
-};
-
-TJSONProtocol.prototype.writeStructEnd = function() {
- var p = this.tpos.pop();
- var struct = this.tstack[p];
- var str = '{';
- var first = true;
- for (var key in struct) {
- if (first) {
- first = false;
- } else {
- str += ',';
- }
-
- str += key + ':' + struct[key];
- }
-
- str += '}';
- this.tstack[p] = str;
-};
-
-TJSONProtocol.prototype.writeFieldBegin = function(name, fieldType, fieldId) {
- this.tpos.push(this.tstack.length);
- this.tstack.push({ 'fieldId': '"' +
- fieldId + '"', 'fieldType': TJSONProtocol.Type[fieldType]
- });
-};
-
-TJSONProtocol.prototype.writeFieldEnd = function() {
- var value = this.tstack.pop();
- var fieldInfo = this.tstack.pop();
-
- if (':' + value === ":[object Object]") {
- this.tstack[this.tstack.length - 1][fieldInfo.fieldId] = '{' +
- fieldInfo.fieldType + ':' + JSON.stringify(value) + '}';
- } else {
- this.tstack[this.tstack.length - 1][fieldInfo.fieldId] = '{' +
- fieldInfo.fieldType + ':' + value + '}';
- }
- this.tpos.pop();
-};
-
-TJSONProtocol.prototype.writeFieldStop = function() {
-};
-
-TJSONProtocol.prototype.writeMapBegin = function(ktype, vtype, size) {
- //size is invalid, we'll set it on end.
- this.tpos.push(this.tstack.length);
- this.tstack.push([TJSONProtocol.Type[ktype], TJSONProtocol.Type[vtype], 0]);
-};
-
-TJSONProtocol.prototype.writeMapEnd = function() {
- var p = this.tpos.pop();
-
- if (p == this.tstack.length) {
- return;
- }
-
- if ((this.tstack.length - p - 1) % 2 !== 0) {
- this.tstack.push('');
- }
-
- var size = (this.tstack.length - p - 1) / 2;
-
- this.tstack[p][this.tstack[p].length - 1] = size;
-
- var map = '}';
- var first = true;
- while (this.tstack.length > p + 1) {
- var v = this.tstack.pop();
- var k = this.tstack.pop();
- if (first) {
- first = false;
- } else {
- map = ',' + map;
- }
-
- if (! isNaN(k)) { k = '"' + k + '"'; } //json "keys" need to be strings
- map = k + ':' + v + map;
- }
- map = '{' + map;
-
- this.tstack[p].push(map);
- this.tstack[p] = '[' + this.tstack[p].join(',') + ']';
-};
-
-TJSONProtocol.prototype.writeListBegin = function(etype, size) {
- this.tpos.push(this.tstack.length);
- this.tstack.push([TJSONProtocol.Type[etype], size]);
-};
-
-TJSONProtocol.prototype.writeListEnd = function() {
- var p = this.tpos.pop();
-
- while (this.tstack.length > p + 1) {
- var tmpVal = this.tstack[p + 1];
- this.tstack.splice(p + 1, 1);
- this.tstack[p].push(tmpVal);
- }
-
- this.tstack[p] = '[' + this.tstack[p].join(',') + ']';
-};
-
-TJSONProtocol.prototype.writeSetBegin = function(etype, size) {
- this.tpos.push(this.tstack.length);
- this.tstack.push([TJSONProtocol.Type[etype], size]);
-};
-
-TJSONProtocol.prototype.writeSetEnd = function() {
- var p = this.tpos.pop();
-
- while (this.tstack.length > p + 1) {
- var tmpVal = this.tstack[p + 1];
- this.tstack.splice(p + 1, 1);
- this.tstack[p].push(tmpVal);
- }
-
- this.tstack[p] = '[' + this.tstack[p].join(',') + ']';
-};
-
-TJSONProtocol.prototype.writeBool = function(bool) {
- this.tstack.push(bool ? 1 : 0);
-};
-
-TJSONProtocol.prototype.writeByte = function(byte) {
- this.tstack.push(byte);
-};
-
-TJSONProtocol.prototype.writeI16 = function(i16) {
- this.tstack.push(i16);
-};
-
-TJSONProtocol.prototype.writeI32 = function(i32) {
- this.tstack.push(i32);
-};
-
-TJSONProtocol.prototype.writeI64 = function(i64) {
- this.tstack.push(i64);
-};
-
-TJSONProtocol.prototype.writeDouble = function(dub) {
- this.tstack.push(dub);
-};
-
-TJSONProtocol.prototype.writeString = function(str) {
- // We do not encode uri components for wire transfer:
- if (str === null) {
- this.tstack.push(null);
- } else {
- // concat may be slower than building a byte buffer
- var escapedString = '';
- for (var i = 0; i < str.length; i++) {
- var ch = str.charAt(i); // a single double quote: "
- if (ch === '\"') {
- escapedString += '\\\"'; // write out as: \"
- } else if (ch === '\\') { // a single backslash: \
- escapedString += '\\\\'; // write out as: \\
- /* Currently escaped forward slashes break TJSONProtocol.
- * As it stands, we can simply pass forward slashes into
- * our strings across the wire without being escaped.
- * I think this is the protocol's bug, not thrift.js
- * } else if(ch === '/') { // a single forward slash: /
- * escapedString += '\\/'; // write out as \/
- * }
- */
- } else if (ch === '\b') { // a single backspace: invisible
- escapedString += '\\b'; // write out as: \b"
- } else if (ch === '\f') { // a single formfeed: invisible
- escapedString += '\\f'; // write out as: \f"
- } else if (ch === '\n') { // a single newline: invisible
- escapedString += '\\n'; // write out as: \n"
- } else if (ch === '\r') { // a single return: invisible
- escapedString += '\\r'; // write out as: \r"
- } else if (ch === '\t') { // a single tab: invisible
- escapedString += '\\t'; // write out as: \t"
- } else {
- escapedString += ch; // Else it need not be escaped
- }
- }
- this.tstack.push('"' + escapedString + '"');
- }
-};
-
-TJSONProtocol.prototype.writeBinary = function(arg) {
- this.writeString(arg);
-};
-
-TJSONProtocol.prototype.readMessageBegin = function() {
- this.rstack = [];
- this.rpos = [];
-
- //Borrow the inbound transport buffer and ensure data is present/consistent
- var transBuf = this.trans.borrow();
- if (transBuf.readIndex >= transBuf.writeIndex) {
- throw new InputBufferUnderrunError();
- }
- var cursor = transBuf.readIndex;
-
- if (transBuf.buf[cursor] !== 0x5B) { //[
- throw new Error("Malformed JSON input, no opening bracket");
- }
-
- //Parse a single message (there may be several in the buffer)
- // TODO: Handle characters using multiple code units
- cursor++;
- var openBracketCount = 1;
- var inString = false;
- for (; cursor < transBuf.writeIndex; cursor++) {
- var chr = transBuf.buf[cursor];
- //we use hexa charcode here because data[i] returns an int and not a char
- if (inString) {
- if (chr === 0x22) { //"
- inString = false;
- } else if (chr === 0x5C) { //\
- //escaped character, skip
- cursor += 1;
- }
- } else {
- if (chr === 0x5B) { //[
- openBracketCount += 1;
- } else if (chr === 0x5D) { //]
- openBracketCount -= 1;
- if (openBracketCount === 0) {
- //end of json message detected
- break;
- }
- } else if (chr === 0x22) { //"
- inString = true;
- }
- }
- }
-
- if (openBracketCount !== 0) {
- throw new Error("Malformed JSON input, mismatched backets");
- }
-
- //Reconstitute the JSON object and conume the necessary bytes
- this.robj = JSON.parse(transBuf.buf.slice(transBuf.readIndex, cursor+1));
- this.trans.consume(cursor + 1 - transBuf.readIndex);
-
- //Verify the protocol version
- var version = this.robj.shift();
- if (version != TJSONProtocol.Version) {
- throw 'Wrong thrift protocol version: ' + version;
- }
-
- //Objectify the thrift message {name/type/sequence-number} for return
- // and then save the JSON object in rstack
- var r = {};
- r.fname = this.robj.shift();
- r.mtype = this.robj.shift();
- r.rseqid = this.robj.shift();
- this.rstack.push(this.robj.shift());
- return r;
-};
-
-TJSONProtocol.prototype.readMessageEnd = function() {
-};
-
-TJSONProtocol.prototype.readStructBegin = function() {
- var r = {};
- r.fname = '';
-
- //incase this is an array of structs
- if (this.rstack[this.rstack.length - 1] instanceof Array) {
- this.rstack.push(this.rstack[this.rstack.length - 1].shift());
- }
-
- return r;
-};
-
-TJSONProtocol.prototype.readStructEnd = function() {
- this.rstack.pop();
-};
-
-TJSONProtocol.prototype.readFieldBegin = function() {
- var r = {};
-
- var fid = -1;
- var ftype = Thrift.Type.STOP;
-
- //get a fieldId
- for (var f in (this.rstack[this.rstack.length - 1])) {
- if (f === null) {
- continue;
- }
-
- fid = parseInt(f, 10);
- this.rpos.push(this.rstack.length);
-
- var field = this.rstack[this.rstack.length - 1][fid];
-
- //remove so we don't see it again
- delete this.rstack[this.rstack.length - 1][fid];
-
- this.rstack.push(field);
-
- break;
- }
-
- if (fid != -1) {
- //should only be 1 of these but this is the only
- //way to match a key
- for (var i in (this.rstack[this.rstack.length - 1])) {
- if (TJSONProtocol.RType[i] === null) {
- continue;
- }
-
- ftype = TJSONProtocol.RType[i];
- this.rstack[this.rstack.length - 1] = this.rstack[this.rstack.length - 1][i];
- }
- }
-
- r.fname = '';
- r.ftype = ftype;
- r.fid = fid;
-
- return r;
-};
-
-TJSONProtocol.prototype.readFieldEnd = function() {
- var pos = this.rpos.pop();
-
- //get back to the right place in the stack
- while (this.rstack.length > pos) {
- this.rstack.pop();
- }
-};
-
-TJSONProtocol.prototype.readMapBegin = function() {
- var map = this.rstack.pop();
-
- var r = {};
- r.ktype = TJSONProtocol.RType[map.shift()];
- r.vtype = TJSONProtocol.RType[map.shift()];
- r.size = map.shift();
-
-
- this.rpos.push(this.rstack.length);
- this.rstack.push(map.shift());
-
- return r;
-};
-
-TJSONProtocol.prototype.readMapEnd = function() {
- this.readFieldEnd();
-};
-
-TJSONProtocol.prototype.readListBegin = function() {
- var list = this.rstack[this.rstack.length - 1];
-
- var r = {};
- r.etype = TJSONProtocol.RType[list.shift()];
- r.size = list.shift();
-
- this.rpos.push(this.rstack.length);
- this.rstack.push(list);
-
- return r;
-};
-
-TJSONProtocol.prototype.readListEnd = function() {
- this.readFieldEnd();
-};
-
-TJSONProtocol.prototype.readSetBegin = function() {
- return this.readListBegin();
-};
-
-TJSONProtocol.prototype.readSetEnd = function() {
- return this.readListEnd();
-};
-
-TJSONProtocol.prototype.readBool = function() {
- var r = this.readI32();
-
- if (r !== null && r.value == '1') {
- r.value = true;
- } else {
- r.value = false;
- }
-
- return r;
-};
-
-TJSONProtocol.prototype.readByte = function() {
- return this.readI32();
-};
-
-TJSONProtocol.prototype.readI16 = function() {
- return this.readI32();
-};
-
-TJSONProtocol.prototype.readI32 = function(f) {
- if (f === undefined) {
- f = this.rstack[this.rstack.length - 1];
- }
-
- var r = {};
-
- if (f instanceof Array) {
- if (f.length === 0) {
- r.value = undefined;
- } else {
- r.value = f.shift();
- }
- } else if (f instanceof Object) {
- for (var i in f) {
- if (i === null) {
- continue;
- }
- this.rstack.push(f[i]);
- delete f[i];
-
- r.value = i;
- break;
- }
- } else {
- r.value = f;
- this.rstack.pop();
- }
-
- return r.value;
-};
-
-TJSONProtocol.prototype.readI64 = function() {
- return new Int64(this.readI32());
-};
-
-TJSONProtocol.prototype.readDouble = function() {
- return this.readI32();
-};
-
-TJSONProtocol.prototype.readBinary = function() {
- return this.readString();
-};
-
-TJSONProtocol.prototype.readString = function() {
- var r = this.readI32();
- return r;
-};
-
-TJSONProtocol.prototype.getTransport = function() {
- return this.trans;
-};
-
-//Method to arbitrarily skip over data.
-TJSONProtocol.prototype.skip = function(type) {
- throw 'skip not supported yet';
-};
-
+module.exports.TBinaryProtocol = require('./binary_protocol');
+module.exports.TCompactProtocol = require('./compact_protocol');
+module.exports.TJSONProtocol = require('./json_protocol');
diff --git a/lib/nodejs/lib/thrift/server.js b/lib/nodejs/lib/thrift/server.js
index 8715f2c..921bb86 100644
--- a/lib/nodejs/lib/thrift/server.js
+++ b/lib/nodejs/lib/thrift/server.js
@@ -19,9 +19,9 @@
var net = require('net');
var tls = require('tls');
-var ttransport = require('./transport'),
- TBinaryProtocol = require('./protocol').TBinaryProtocol;
-
+var TBufferedTransport = require('./buffered_transport');
+var TBinaryProtocol = require('./binary_protocol');
+var InputBufferUnderrunError = require('./input_buffer_underrun_error');
/**
* Create a Thrift server which can serve one or multiple services.
@@ -31,13 +31,13 @@
* @returns {object} - The Apache Thrift Multiplex Server.
*/
exports.createMultiplexServer = function(processor, options) {
- var transport = (options && options.transport) ? options.transport : ttransport.TBufferedTransport;
+ var transport = (options && options.transport) ? options.transport : TBufferedTransport;
var protocol = (options && options.protocol) ? options.protocol : TBinaryProtocol;
function serverImpl(stream) {
var self = this;
- stream.on('error', function(err) {
- self.emit('error', err);
+ stream.on('error', function(err) {
+ self.emit('error', err);
});
stream.on('data', transport.receiver(function(transportWithData) {
var input = new protocol(transportWithData);
@@ -56,7 +56,7 @@
transportWithData.commitPosition();
} while (true);
} catch (err) {
- if (err instanceof ttransport.InputBufferUnderrunError) {
+ if (err instanceof InputBufferUnderrunError) {
//The last data in the buffer was not a complete message, wait for the rest
transportWithData.rollbackPosition();
}
diff --git a/lib/nodejs/lib/thrift/transport.js b/lib/nodejs/lib/thrift/transport.js
index 6d4224a..59daa98 100644
--- a/lib/nodejs/lib/thrift/transport.js
+++ b/lib/nodejs/lib/thrift/transport.js
@@ -16,285 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-var emptyBuf = new Buffer(0);
-var binary = require('./binary');
-var util = require("util");
-
-var InputBufferUnderrunError = exports.InputBufferUnderrunError = function(message) {
- Error.call(this, message);
-};
-util.inherits(InputBufferUnderrunError, Error);
-
-var TFramedTransport = exports.TFramedTransport = function(buffer, callback) {
- this.inBuf = buffer || emptyBuf;
- this.outBuffers = [];
- this.outCount = 0;
- this.readPos = 0;
- this.onFlush = callback;
-};
-TFramedTransport.receiver = function(callback) {
- var residual = null;
-
- return function(data) {
- // Prepend any residual data from our previous read
- if (residual) {
- data = Buffer.concat([residual, data]);
- residual = null;
- }
-
- // framed transport
- while (data.length) {
- if (data.length < 4) {
- // Not enough bytes to continue, save and resume on next packet
- residual = data;
- return;
- }
- var frameSize = binary.readI32(data, 0);
- if (data.length < 4 + frameSize) {
- // Not enough bytes to continue, save and resume on next packet
- residual = data;
- return;
- }
-
- var frame = data.slice(4, 4 + frameSize);
- residual = data.slice(4 + frameSize);
-
- callback(new TFramedTransport(frame));
-
- data = residual;
- residual = null;
- }
- };
-};
-
-TFramedTransport.prototype = {
- commitPosition: function(){},
- rollbackPosition: function(){},
-
- // TODO: Implement open/close support
- isOpen: function() {return true;},
- open: function() {},
- close: function() {},
-
- ensureAvailable: function(len) {
- if (this.readPos + len > this.inBuf.length) {
- throw new InputBufferUnderrunError();
- }
- },
-
- read: function(len) { // this function will be used for each frames.
- this.ensureAvailable(len);
- var end = this.readPos + len;
-
- if (this.inBuf.length < end) {
- throw new Error('read(' + len + ') failed - not enough data');
- }
-
- var buf = this.inBuf.slice(this.readPos, end);
- this.readPos = end;
- return buf;
- },
-
- readByte: function() {
- this.ensureAvailable(1);
- return binary.readByte(this.inBuf[this.readPos++]);
- },
-
- readI16: function() {
- this.ensureAvailable(2);
- var i16 = binary.readI16(this.inBuf, this.readPos);
- this.readPos += 2;
- return i16;
- },
-
- readI32: function() {
- this.ensureAvailable(4);
- var i32 = binary.readI32(this.inBuf, this.readPos);
- this.readPos += 4;
- return i32;
- },
-
- readDouble: function() {
- this.ensureAvailable(8);
- var d = binary.readDouble(this.inBuf, this.readPos);
- this.readPos += 8;
- return d;
- },
-
- readString: function(len) {
- this.ensureAvailable(len);
- var str = this.inBuf.toString('utf8', this.readPos, this.readPos + len);
- this.readPos += len;
- return str;
- },
-
- borrow: function() {
- return { buf: this.inBuf, readIndex: this.readPos, writeIndex: this.inBuf.length };
- },
-
- consume: function(bytesConsumed) {
- this.readPos += bytesConsumed;
- },
-
- write: function(buf, encoding) {
- if (typeof(buf) === "string") {
- buf = new Buffer(buf, encoding || 'utf8');
- }
- this.outBuffers.push(buf);
- this.outCount += buf.length;
- },
-
- flush: function() {
- var out = new Buffer(this.outCount),
- pos = 0;
- this.outBuffers.forEach(function(buf) {
- buf.copy(out, pos, 0);
- pos += buf.length;
- });
-
- if (this.onFlush) {
- // TODO: optimize this better, allocate one buffer instead of both:
- var msg = new Buffer(out.length + 4);
- binary.writeI32(msg, out.length);
- out.copy(msg, 4, 0, out.length);
- this.onFlush(msg);
- }
-
- this.outBuffers = [];
- this.outCount = 0;
- }
-};
-
-var TBufferedTransport = exports.TBufferedTransport = function(buffer, callback) {
- this.defaultReadBufferSize = 1024;
- this.writeBufferSize = 512; // Soft Limit
- this.inBuf = new Buffer(this.defaultReadBufferSize);
- this.readCursor = 0;
- this.writeCursor = 0; // for input buffer
- this.outBuffers = [];
- this.outCount = 0;
- this.onFlush = callback;
-};
-TBufferedTransport.receiver = function(callback) {
- var reader = new TBufferedTransport();
-
- return function(data) {
- if (reader.writeCursor + data.length > reader.inBuf.length) {
- var buf = new Buffer(reader.writeCursor + data.length);
- reader.inBuf.copy(buf, 0, 0, reader.writeCursor);
- reader.inBuf = buf;
- }
- data.copy(reader.inBuf, reader.writeCursor, 0);
- reader.writeCursor += data.length;
-
- callback(reader);
- };
-};
-
-TBufferedTransport.prototype = {
- commitPosition: function(){
- var unreadSize = this.writeCursor - this.readCursor;
- var bufSize = (unreadSize * 2 > this.defaultReadBufferSize) ?
- unreadSize * 2 : this.defaultReadBufferSize;
- var buf = new Buffer(bufSize);
- if (unreadSize > 0) {
- this.inBuf.copy(buf, 0, this.readCursor, this.writeCursor);
- }
- this.readCursor = 0;
- this.writeCursor = unreadSize;
- this.inBuf = buf;
- },
- rollbackPosition: function(){
- this.readCursor = 0;
- },
-
- // TODO: Implement open/close support
- isOpen: function() {return true;},
- open: function() {},
- close: function() {},
-
- ensureAvailable: function(len) {
- if (this.readCursor + len > this.writeCursor) {
- throw new InputBufferUnderrunError();
- }
- },
-
- read: function(len) {
- this.ensureAvailable(len);
- var buf = new Buffer(len);
- this.inBuf.copy(buf, 0, this.readCursor, this.readCursor + len);
- this.readCursor += len;
- return buf;
- },
-
- readByte: function() {
- this.ensureAvailable(1);
- return binary.readByte(this.inBuf[this.readCursor++]);
- },
-
- readI16: function() {
- this.ensureAvailable(2);
- var i16 = binary.readI16(this.inBuf, this.readCursor);
- this.readCursor += 2;
- return i16;
- },
-
- readI32: function() {
- this.ensureAvailable(4);
- var i32 = binary.readI32(this.inBuf, this.readCursor);
- this.readCursor += 4;
- return i32;
- },
-
- readDouble: function() {
- this.ensureAvailable(8);
- var d = binary.readDouble(this.inBuf, this.readCursor);
- this.readCursor += 8;
- return d;
- },
-
- readString: function(len) {
- this.ensureAvailable(len);
- var str = this.inBuf.toString('utf8', this.readCursor, this.readCursor + len);
- this.readCursor += len;
- return str;
- },
-
- borrow: function() {
- var obj = {buf: this.inBuf, readIndex: this.readCursor, writeIndex: this.writeCursor};
- return obj;
- },
-
- consume: function(bytesConsumed) {
- this.readCursor += bytesConsumed;
- },
-
- write: function(buf) {
- if (typeof(buf) === "string") {
- buf = new Buffer(buf, 'utf8');
- }
- this.outBuffers.push(buf);
- this.outCount += buf.length;
- },
-
- flush: function() {
- if (this.outCount < 1) {
- return;
- }
-
- var msg = new Buffer(this.outCount),
- pos = 0;
- this.outBuffers.forEach(function(buf) {
- buf.copy(msg, pos, 0);
- pos += buf.length;
- });
-
- if (this.onFlush) {
- this.onFlush(msg);
- }
-
- this.outBuffers = [];
- this.outCount = 0;
- }
-};
+module.exports.TBufferedTransport = require('./buffered_transport');
+module.exports.TFramedTransport = require('./framed_transport');
+module.exports.InputBufferUnderrunError = require('./input_buffer_underrun_error');
diff --git a/lib/nodejs/lib/thrift/web_server.js b/lib/nodejs/lib/thrift/web_server.js
index c575c6d..68eb94d 100644
--- a/lib/nodejs/lib/thrift/web_server.js
+++ b/lib/nodejs/lib/thrift/web_server.js
@@ -24,11 +24,12 @@
var crypto = require("crypto");
var MultiplexedProcessor = require('./multiplexed_processor').MultiplexedProcessor;
-var TTransport = require('./transport');
-var TBufferedTransport = require('./transport').TBufferedTransport;
-var TBinaryProtocol = require('./protocol').TBinaryProtocol;
-// WSFrame constructor and prototype
+var TBufferedTransport = require('./buffered_transport');
+var TBinaryProtocol = require('./binary_protocol');
+var InputBufferUnderrunError = require('./input_buffer_underrun_error');
+
+// WSFrame constructor and prototype
/////////////////////////////////////////////////////////////////////
/** Apache Thrift RPC Web Socket Transport
@@ -44,11 +45,11 @@
* - Opcode is 1(TEXT) for TJSONProtocol and 2(BIN) for TBinaryProtocol
* - Mask Present bit is 1 sending to-server and 0 sending to-client
* - Payload Len:
- * + If < 126: then represented directly
+ * + If < 126: then represented directly
* + If >=126: but within range of an unsigned 16 bit integer
- * then Payload Len is 126 and the two following bytes store
+ * then Payload Len is 126 and the two following bytes store
* the length
- * + Else: Payload Len is 127 and the following 8 bytes store the
+ * + Else: Payload Len is 127 and the following 8 bytes store the
* length as an unsigned 64 bit integer
* - Masking key is a 32 bit key only present when sending to the server
* - Payload follows the masking key or length
@@ -73,9 +74,9 @@
* +---------------------------------------------------------------+
*/
var wsFrame = {
- /** Encodes a WebSocket frame
+ /** Encodes a WebSocket frame
*
- * @param {Buffer} data - The raw data to encode
+ * @param {Buffer} data - The raw data to encode
* @param {Buffer} mask - The mask to apply when sending to server, null for no mask
* @param {Boolean} binEncoding - True for binary encoding, false for text encoding
* @returns {Buffer} - The WebSocket frame, ready to send
@@ -83,18 +84,18 @@
encode: function(data, mask, binEncoding) {
var frame = new Buffer(wsFrame.frameSizeFromData(data, mask));
//Byte 0 - FIN & OPCODE
- frame[0] = wsFrame.fin.FIN +
+ frame[0] = wsFrame.fin.FIN +
(binEncoding ? wsFrame.frameOpCodes.BIN : wsFrame.frameOpCodes.TEXT);
//Byte 1 or 1-3 or 1-9 - MASK FLAG & SIZE
var payloadOffset = 2;
if (data.length < 0x7E) {
frame[1] = data.length + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT);
} else if (data.length < 0xFFFF) {
- frame[1] = 0x7E + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT);
+ frame[1] = 0x7E + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT);
frame.writeUInt16BE(data.length, 2, true);
payloadOffset = 4;
} else {
- frame[1] = 0x7F + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT);
+ frame[1] = 0x7F + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT);
frame.writeUInt32BE(0, 2, true);
frame.writeUInt32BE(data.length, 6, true);
payloadOffset = 10;
@@ -116,18 +117,18 @@
* @class
* @name WSDecodeResult
* @property {Buffer} data - The decoded data for the first ATRPC message
- * @property {Buffer} mask - The frame mask
- * @property {Boolean} binEncoding - True if binary (TBinaryProtocol),
+ * @property {Buffer} mask - The frame mask
+ * @property {Boolean} binEncoding - True if binary (TBinaryProtocol),
* False if text (TJSONProtocol)
- * @property {Buffer} nextFrame - Multiple ATRPC messages may be sent in a
+ * @property {Buffer} nextFrame - Multiple ATRPC messages may be sent in a
* single WebSocket frame, this Buffer contains
* any bytes remaining to be decoded
* @property {Boolean} FIN - True is the message is complete
*/
-
- /** Decodes a WebSocket frame
+
+ /** Decodes a WebSocket frame
*
- * @param {Buffer} frame - The raw inbound frame, if this is a continuation
+ * @param {Buffer} frame - The raw inbound frame, if this is a continuation
* frame it must have a mask property with the mask.
* @returns {WSDecodeResult} - The decoded payload
*
@@ -163,7 +164,7 @@
result.mask = new Buffer(4);
frame.copy(result.mask, 0, dataOffset, dataOffset + 4);
dataOffset += 4;
- }
+ }
//Payload
result.data = new Buffer(len);
frame.copy(result.data, 0, dataOffset, dataOffset+len);
@@ -183,7 +184,7 @@
return result;
},
- /** Masks/Unmasks data
+ /** Masks/Unmasks data
*
* @param {Buffer} data - data to mask/unmask in place
* @param {Buffer} mask - the mask
@@ -203,7 +204,7 @@
* @param {Boolean} mask - true if a mask will be sent (TO_SERVER)
*/
frameSizeFromData: function(data, mask) {
- var headerSize = 10;
+ var headerSize = 10;
if (data.length < 0x7E) {
headerSize = 2;
} else if (data.length < 0xFFFF) {
@@ -237,15 +238,15 @@
/**
* @class
* @name ServerOptions
- * @property {array} cors - Array of CORS origin strings to permit requests from.
- * @property {string} files - Path to serve static files from, if absent or ""
+ * @property {array} cors - Array of CORS origin strings to permit requests from.
+ * @property {string} files - Path to serve static files from, if absent or ""
* static file service is disabled.
* @property {object} headers - An object hash mapping header strings to header value
- * strings, these headers are transmitted in response to
+ * strings, these headers are transmitted in response to
* static file GET operations.
- * @property {object} services - An object hash mapping service URI strings
+ * @property {object} services - An object hash mapping service URI strings
* to ServiceOptions objects
- * @property {object} tls - Node.js TLS options (see: nodejs.org/api/tls.html),
+ * @property {object} tls - Node.js TLS options (see: nodejs.org/api/tls.html),
* if not present or null regular http is used,
* at least a key and a cert must be defined to use SSL/TLS
* @see {@link ServiceOptions}
@@ -254,19 +255,19 @@
/**
* @class
* @name ServiceOptions
- * @property {object} transport - The layered transport to use (defaults
+ * @property {object} transport - The layered transport to use (defaults
* to TBufferedTransport).
- * @property {object} protocol - The serialization Protocol to use (defaults to
+ * @property {object} protocol - The serialization Protocol to use (defaults to
* TBinaryProtocol).
- * @property {object} processor - The Thrift Service class/processor generated
- * by the IDL Compiler for the service (the "cls"
+ * @property {object} processor - The Thrift Service class/processor generated
+ * by the IDL Compiler for the service (the "cls"
* key can also be used for this attribute).
* @property {object} handler - The handler methods for the Thrift Service.
*/
-/**
+/**
* Create a Thrift server which can serve static files and/or one or
- * more Thrift Services.
+ * more Thrift Services.
* @param {ServerOptions} options - The server configuration.
* @returns {object} - The Apache Thrift Web Server.
*/
@@ -290,7 +291,7 @@
var services = options.services;
for (var uri in services) {
var svcObj = services[uri];
-
+
//Setup the processor
if (svcObj.processor instanceof MultiplexedProcessor) {
//Multiplex processors have pre embedded processor/handler pairs, save as is
@@ -298,9 +299,9 @@
} else {
//For historical reasons Node.js supports processors passed in directly or via the
// IDL Compiler generated class housing the processor. Also, the options property
- // for a Processor has been called both cls and processor at different times. We
+ // for a Processor has been called both cls and processor at different times. We
// support any of the four possibilities here.
- var processor = (svcObj.processor) ? (svcObj.processor.Processor || svcObj.processor) :
+ var processor = (svcObj.processor) ? (svcObj.processor.Processor || svcObj.processor) :
(svcObj.cls.Processor || svcObj.cls);
//Processors can be supplied as constructed objects with handlers already embedded,
// if a handler is provided we construct a new processor, if not we use the processor
@@ -314,7 +315,7 @@
svcObj.transport = svcObj.transport ? svcObj.transport : TBufferedTransport;
svcObj.protocol = svcObj.protocol ? svcObj.protocol : TBinaryProtocol;
}
-
+
//Verify CORS requirements
function VerifyCORSAndSetHeaders(request, response) {
if (request.headers.origin && options.cors) {
@@ -333,7 +334,7 @@
//Allow, CORS is not in use
return true;
}
-
+
//Handle OPTIONS method (CORS)
///////////////////////////////////////////////////
@@ -345,8 +346,8 @@
}
response.end();
}
-
-
+
+
//Handle POST methods (TXHRTransport)
///////////////////////////////////////////////////
function processPost(request, response) {
@@ -365,7 +366,7 @@
response.end();
return;
}
-
+
//Process XHR payload
request.on('data', svc.transport.receiver(function(transportWithData) {
var input = new svc.protocol(transportWithData);
@@ -383,7 +384,7 @@
svc.processor.process(input, output);
transportWithData.commitPosition();
} catch (err) {
- if (err instanceof TTransport.InputBufferUnderrunError) {
+ if (err instanceof InputBufferUnderrunError) {
transportWithData.rollbackPosition();
} else {
response.writeHead(500);
@@ -401,7 +402,7 @@
if (!baseDir || "" === baseDir) {
response.writeHead(404);
response.end();
- return;
+ return;
}
//Verify CORS requirements
@@ -420,11 +421,11 @@
response.end();
return;
}
-
+
if (fs.statSync(filename).isDirectory()) {
filename += '/index.html';
}
-
+
fs.readFile(filename, "binary", function(err, file) {
if (err) {
response.writeHead(500);
@@ -466,7 +467,7 @@
transportWithData.commitPosition();
}
catch (err) {
- if (err instanceof TTransport.InputBufferUnderrunError) {
+ if (err instanceof InputBufferUnderrunError) {
transportWithData.rollbackPosition();
}
else {
@@ -485,7 +486,7 @@
}
//Wire up listeners for upgrade(to WebSocket) & request methods for:
- // - GET static files,
+ // - GET static files,
// - POST XHR Thrift services
// - OPTIONS CORS requests
server.on('request', function(request, response) {
diff --git a/lib/nodejs/lib/thrift/ws_connection.js b/lib/nodejs/lib/thrift/ws_connection.js
index 54dd936..0812934 100644
--- a/lib/nodejs/lib/thrift/ws_connection.js
+++ b/lib/nodejs/lib/thrift/ws_connection.js
@@ -23,6 +23,12 @@
var ttransport = require('./transport');
var tprotocol = require('./protocol');
+var TBufferedTransport = require('./buffered_transport');
+var TJSONProtocol = require('./json_protocol');
+var InputBufferUnderrunError = require('./input_buffer_underrun_error');
+
+var createClient = require('./create_client');
+
/**
* @class
* @name WSConnectOptions
@@ -77,12 +83,12 @@
this.host = host;
this.port = port;
this.secure = this.options.secure || false;
- this.transport = this.options.transport || ttransport.TBufferedTransport;
- this.protocol = this.options.protocol || tprotocol.TJSONProtocol;
+ this.transport = this.options.transport || TBufferedTransport;
+ this.protocol = this.options.protocol || TJSONProtocol;
this.path = this.options.path;
this.send_pending = [];
- //The sequence map is used to map seqIDs back to the
+ //The sequence map is used to map seqIDs back to the
// calling client in multiplexed scenarios
this.seqId2Service = {};
@@ -108,7 +114,7 @@
var self = this;
this.emit("open");
if (this.send_pending.length > 0) {
- //If the user made calls before the connection was fully
+ //If the user made calls before the connection was fully
//open, send them now
this.send_pending.forEach(function(data) {
self.socket.send(data);
@@ -132,12 +138,12 @@
//The Multiplexed Protocol stores a hash of seqid to service names
// in seqId2Service. If the SeqId is found in the hash we need to
// lookup the appropriate client for this call.
- // The client var is a single client object when not multiplexing,
+ // The client var is a single client object when not multiplexing,
// when using multiplexing it is a service name keyed hash of client
// objects.
//NOTE: The 2 way interdependencies between protocols, transports,
// connections and clients in the Node.js implementation are irregular
- // and make the implementation difficult to extend and maintain. We
+ // and make the implementation difficult to extend and maintain. We
// should bring this stuff inline with typical thrift I/O stack
// operation soon.
// --ra
@@ -167,7 +173,7 @@
}
}
} catch (e) {
- if (e instanceof ttransport.InputBufferUnderrunError) {
+ if (e instanceof InputBufferUnderrunError) {
transport_with_data.rollbackPosition();
} else {
throw e;
@@ -183,8 +189,8 @@
this.transport.receiver(this.__decodeCallback.bind(this))(buf);
};
-WSConnection.prototype.__onMessage = function(evt) {
+WSConnection.prototype.__onMessage = function(evt) {
this.__onData(evt.data);
};
@@ -230,7 +236,6 @@
* Return URI for the connection
* @returns {string} URI
*/
-
WSConnection.prototype.uri = function() {
var schema = this.secure ? 'wss' : 'ws';
var port = '';
@@ -276,21 +281,4 @@
return new WSConnection(host, port, options);
};
-/**
- * Creates a new client object for the specified Thrift service.
- * @param {object} cls - The module containing the service client
- * @param {WSConnection} wsConnection - The connection to use.
- * @returns {object} The client object.
- * @see {@link createWSConnection}
- */
-exports.createWSClient = function(cls, wsConnection) {
- if (cls.Client) {
- cls = cls.Client;
- }
- wsConnection.client =
- new cls(new wsConnection.transport(undefined, function(buf) {
- wsConnection.write(buf);
- }),
- wsConnection.protocol);
- return wsConnection.client;
-};
+exports.createWSClient = createClient;
diff --git a/lib/nodejs/test/binary.test.js b/lib/nodejs/test/binary.test.js
index 58feebf..dacadef 100644
--- a/lib/nodejs/test/binary.test.js
+++ b/lib/nodejs/test/binary.test.js
@@ -24,7 +24,7 @@
"Should read signed byte": function(test){
test.strictEqual(1, binary.readByte(0x01));
test.strictEqual(-1, binary.readByte(0xFF));
-
+
test.strictEqual(127, binary.readByte(0x7F));
test.strictEqual(-128, binary.readByte(0x80));
test.done();
@@ -123,11 +123,11 @@
test.deepEqual([0x3f, 0xd5, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55], binary.writeDouble([], 1/3));
// Min subnormal positive double
- test.deepEqual([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01], binary.writeDouble([], 4.9406564584124654e-324));
+ test.deepEqual([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01], binary.writeDouble([], 4.9406564584124654e-324));
// Min normal positive double
- test.deepEqual([0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], binary.writeDouble([], 2.2250738585072014e-308));
+ test.deepEqual([0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], binary.writeDouble([], 2.2250738585072014e-308));
// Max positive double
- test.deepEqual([0x7f, 0xef, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff], binary.writeDouble([], 1.7976931348623157e308));
+ test.deepEqual([0x7f, 0xef, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff], binary.writeDouble([], 1.7976931348623157e308));
test.done();
}
});
diff --git a/lib/nodejs/test/client.js b/lib/nodejs/test/client.js
index 3b5f1cb..b8de7ce 100755
--- a/lib/nodejs/test/client.js
+++ b/lib/nodejs/test/client.js
@@ -58,7 +58,7 @@
protocol = thrift.TJSONProtocol;
} else if (program.protocol === "compact") {
protocol = thrift.TCompactProtocol;
-}
+}
var transport = thrift.TBufferedTransport;
if (program.transport === "framed") {
@@ -88,7 +88,7 @@
var testDriver = ThriftTestDriver;
if (program.promise) {
testDriver = ThriftTestDriverPromise;
-}
+}
testDriver(client, function (status) {
console.log(status);
connection.end();
diff --git a/lib/nodejs/test/http_client.js b/lib/nodejs/test/http_client.js
index 14c1a29..9ab05d8 100644
--- a/lib/nodejs/test/http_client.js
+++ b/lib/nodejs/test/http_client.js
@@ -41,7 +41,7 @@
var protocol = thrift.TBinaryProtocol;
if (program.protocol === "json") {
protocol = thrift.TJSONProtocol;
-}
+}
var transport = thrift.TBufferedTransport;
if (program.transport === "framed") {
@@ -58,7 +58,7 @@
if (program.ssl) {
options.nodeOptions = { rejectUnauthorized: false };
options.https = true;
-}
+}
var connection = thrift.createHttpConnection("localhost", 9090, options);
@@ -72,7 +72,7 @@
if (program.promise) {
console.log(" --Testing promise style client");
testDriver = ThriftTestDriverPromise;
-}
+}
testDriver(client, function (status) {
console.log(status);
process.exit(0);
diff --git a/lib/nodejs/test/http_server.js b/lib/nodejs/test/http_server.js
index f12e695..3519f4a 100644
--- a/lib/nodejs/test/http_server.js
+++ b/lib/nodejs/test/http_server.js
@@ -36,31 +36,31 @@
var transport = thrift.TBufferedTransport;
if (program.transport === "framed") {
transport = thrift.TFramedTransport;
-}
+}
var protocol = thrift.TBinaryProtocol;
if (program.protocol === "json") {
protocol = thrift.TJSONProtocol;
-}
+}
var handler = ThriftTestHandler;
if (program.promise) {
handler = ThriftTestHandlerPromise;
-}
+}
-var SvcOpt = {
- handler: handler,
- processor: ThriftTest,
- protocol: protocol,
- transport: transport
-};
-var serverOpt = { services: { "/test": SvcOpt } };
+var SvcOpt = {
+ handler: handler,
+ processor: ThriftTest,
+ protocol: protocol,
+ transport: transport
+};
+var serverOpt = { services: { "/test": SvcOpt } };
if (program.ssl) {
serverOpt.tls = {
key: fs.readFileSync(path.resolve(__dirname, 'server.key')),
cert: fs.readFileSync(path.resolve(__dirname, 'server.crt'))
};
}
-thrift.createWebServer(serverOpt).listen(9090);
+thrift.createWebServer(serverOpt).listen(9090);
diff --git a/lib/nodejs/test/multiplex_client.js b/lib/nodejs/test/multiplex_client.js
index 7b58205..7004f93 100644
--- a/lib/nodejs/test/multiplex_client.js
+++ b/lib/nodejs/test/multiplex_client.js
@@ -40,7 +40,7 @@
var protocol = thrift.TBinaryProtocol;
if (program.protocol === "json") {
protocol = thrift.TJSONProtocol;
-}
+}
var options = {
transport: transport,
diff --git a/lib/nodejs/test/server.js b/lib/nodejs/test/server.js
index 378a6e2..b6d28c7 100755
--- a/lib/nodejs/test/server.js
+++ b/lib/nodejs/test/server.js
@@ -39,14 +39,14 @@
var transport = thrift.TBufferedTransport;
if (program.transport === "framed") {
transport = thrift.TFramedTransport;
-}
+}
var protocol = thrift.TBinaryProtocol;
if (program.protocol === "json") {
protocol = thrift.TJSONProtocol;
} else if (program.protocol === "compact") {
protocol = thrift.TCompactProtocol;
-}
+}
var port = 9090;
if (String(program.port) === "undefined"){
@@ -57,7 +57,7 @@
var handler = ThriftTestHandler;
if (program.promise) {
handler = ThriftTestHandlerPromise;
-}
+}
var options = {
protocol: protocol,
diff --git a/lib/nodejs/test/test_handler.js b/lib/nodejs/test/test_handler.js
index 09ff39f..fd25120 100644
--- a/lib/nodejs/test/test_handler.js
+++ b/lib/nodejs/test/test_handler.js
@@ -17,7 +17,7 @@
* under the License.
*/
-//This is the server side Node test handler for the standard
+//This is the server side Node test handler for the standard
// Apache Thrift test service.
var ttypes = require('./gen-nodejs/ThriftTest_types');
diff --git a/lib/nodejs/test/thrift_test_driver.js b/lib/nodejs/test/thrift_test_driver.js
index 5ddcb21..306f67d 100644
--- a/lib/nodejs/test/thrift_test_driver.js
+++ b/lib/nodejs/test/thrift_test_driver.js
@@ -17,13 +17,13 @@
* under the License.
*/
- // This is the Node.js test driver for the standard Apache Thrift
- // test service. The driver invokes every function defined in the
+ // This is the Node.js test driver for the standard Apache Thrift
+ // test service. The driver invokes every function defined in the
// Thrift Test service with a representative range of parameters.
//
// The ThriftTestDriver function requires a client object
// connected to a server hosting the Thrift Test service and
- // supports an optional callback function which is called with
+ // supports an optional callback function which is called with
// a status message when the test is complete.
var assert = require('assert');
@@ -31,15 +31,15 @@
var Int64 = require('node-int64');
var ThriftTestDriver = exports.ThriftTestDriver = function(client, callback) {
-
+
function checkRecursively(map1, map2) {
if (typeof map1 !== 'function' && typeof map2 !== 'function') {
if (!map1 || typeof map1 !== 'object') {
//Handle int64 types (which use node-int64 in Node.js JavaScript)
- if ((typeof map1 === "number") && (typeof map2 === "object") &&
+ if ((typeof map1 === "number") && (typeof map2 === "object") &&
(map2.buffer) && (map2.buffer instanceof Buffer) && (map2.buffer.length === 8)) {
var n = new Int64(map2.buffer);
- assert.equal(map1, n.toNumber());
+ assert.equal(map1, n.toNumber());
} else {
assert.equal(map1, map2);
}
diff --git a/lib/nodejs/test/thrift_test_driver_promise.js b/lib/nodejs/test/thrift_test_driver_promise.js
index 74a91bd..9b991ef 100644
--- a/lib/nodejs/test/thrift_test_driver_promise.js
+++ b/lib/nodejs/test/thrift_test_driver_promise.js
@@ -202,9 +202,9 @@
.fail(function() {
assert(false);
});
-
-// TODO: add testBinary()
-
+
+// TODO: add testBinary()
+
var out = new ttypes.Xtruct({
string_thing: 'Zero',
byte_thing: 1,
diff --git a/lib/nodejs/test/ws_client.js b/lib/nodejs/test/ws_client.js
index 4573246..93b93b7 100644
--- a/lib/nodejs/test/ws_client.js
+++ b/lib/nodejs/test/ws_client.js
@@ -40,7 +40,7 @@
var protocol = thrift.TBinaryProtocol;
if (program.protocol === "json") {
protocol = thrift.TJSONProtocol;
-}
+}
var transport = thrift.TBufferedTransport;
if (program.transport === "framed") {
@@ -56,7 +56,7 @@
if (program.ssl) {
options.wsOptions = { rejectUnauthorized: false };
options.secure = true;
-}
+}
var connection = thrift.createWSConnection("localhost", 9090, options);
connection.open();
@@ -71,7 +71,7 @@
if (program.promise) {
console.log(" --Testing promise style client");
testDriver = ThriftTestDriverPromise;
-}
+}
testDriver(client, function (status) {
console.log(status);
process.exit(0);