[THRIFT-4771] add nodejs THeaderProtocol support (#1743)
Client: nodejs
diff --git a/lib/nodejs/lib/thrift/binary_protocol.js b/lib/nodejs/lib/thrift/binary_protocol.js
index 6ab9c05..af8836c 100644
--- a/lib/nodejs/lib/thrift/binary_protocol.js
+++ b/lib/nodejs/lib/thrift/binary_protocol.js
@@ -33,6 +33,10 @@
VERSION_1 = -2147418112, // 0x80010000
TYPE_MASK = 0x000000ff;
+TBinaryProtocol.VERSION_MASK = VERSION_MASK;
+TBinaryProtocol.VERSION_1 = VERSION_1;
+TBinaryProtocol.TYPE_MASK = TYPE_MASK
+
function TBinaryProtocol(trans, strictRead, strictWrite) {
this.trans = trans;
this.strictRead = (strictRead !== undefined ? strictRead : false);
diff --git a/lib/nodejs/lib/thrift/buffered_transport.js b/lib/nodejs/lib/thrift/buffered_transport.js
index a9e006e..113e216 100644
--- a/lib/nodejs/lib/thrift/buffered_transport.js
+++ b/lib/nodejs/lib/thrift/buffered_transport.js
@@ -19,6 +19,7 @@
var binary = require('./binary');
var InputBufferUnderrunError = require('./input_buffer_underrun_error');
+var THeaderTransport = require('./header_transport');
module.exports = TBufferedTransport;
@@ -33,6 +34,8 @@
this.onFlush = callback;
};
+TBufferedTransport.prototype = new THeaderTransport();
+
TBufferedTransport.prototype.reset = function() {
this.inBuf = new Buffer(this.defaultReadBufferSize);
this.readCursor = 0;
diff --git a/lib/nodejs/lib/thrift/framed_transport.js b/lib/nodejs/lib/thrift/framed_transport.js
index 6947925..f7daa3f 100644
--- a/lib/nodejs/lib/thrift/framed_transport.js
+++ b/lib/nodejs/lib/thrift/framed_transport.js
@@ -19,6 +19,7 @@
var binary = require('./binary');
var InputBufferUnderrunError = require('./input_buffer_underrun_error');
+var THeaderTransport = require('./header_transport');
module.exports = TFramedTransport;
@@ -30,6 +31,8 @@
this.onFlush = callback;
};
+TFramedTransport.prototype = new THeaderTransport();
+
TFramedTransport.receiver = function(callback, seqid) {
var residual = null;
diff --git a/lib/nodejs/lib/thrift/header_protocol.js b/lib/nodejs/lib/thrift/header_protocol.js
new file mode 100644
index 0000000..0c3b0db
--- /dev/null
+++ b/lib/nodejs/lib/thrift/header_protocol.js
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+var util = require('util');
+var TBinaryProtocol = require('./binary_protocol');
+var TCompactProtocol = require('./compact_protocol');
+var THeaderTransport = require('./header_transport');
+
+var ProtocolMap = {};
+ProtocolMap[THeaderTransport.SubprotocolId.BINARY] = TBinaryProtocol;
+ProtocolMap[THeaderTransport.SubprotocolId.COMPACT] = TCompactProtocol;
+
+module.exports = THeaderProtocol;
+
+function THeaderProtocolError(message) {
+ Error.call(this);
+ Error.captureStackTrace(this, this.constructor);
+ this.name = this.constructor.name;
+ this.message = message;
+}
+
+util.inherits(THeaderProtocolError, Error);
+
+/**
+ * A framed protocol with headers.
+ *
+ * THeaderProtocol frames other Thrift protocols and adds support for
+ * optional out-of-band headers. The currently supported subprotocols are
+ * TBinaryProtocol and TCompactProtocol. It can currently only be used with
+ * transports that inherit THeaderTransport.
+ *
+ * THeaderProtocol does not currently support THTTPServer, TNonblockingServer,
+ * or TProcessPoolServer.
+ *
+ * See doc/specs/HeaderFormat.md for details of the wire format.
+ */
+function THeaderProtocol(trans) {
+ if (!(trans instanceof THeaderTransport)) {
+ throw new THeaderProtocolError(
+ 'Only transports that inherit THeaderTransport can be' +
+ ' used with THeaderProtocol'
+ );
+ }
+ this.trans = trans;
+ this.setProtocol();
+};
+
+THeaderProtocol.prototype.flush = function() {
+ // Headers must be written prior to flushing because because
+ // you need to calculate the length of the payload for the length
+ // field of the header
+ this.trans.writeHeaders();
+ return this.trans.flush();
+};
+
+THeaderProtocol.prototype.writeMessageBegin = function(name, type, seqid) {
+ return this.protocol.writeMessageBegin(name, type, seqid);
+};
+
+THeaderProtocol.prototype.writeMessageEnd = function() {
+ return this.protocol.writeMessageEnd();
+};
+
+THeaderProtocol.prototype.writeStructBegin = function(name) {
+ return this.protocol.writeStructBegin(name);
+};
+
+THeaderProtocol.prototype.writeStructEnd = function() {
+ return this.protocol.writeStructEnd();
+};
+
+THeaderProtocol.prototype.writeFieldBegin = function(name, type, id) {
+ return this.protocol.writeFieldBegin(name, type, id);
+}
+
+THeaderProtocol.prototype.writeFieldEnd = function() {
+ return this.protocol.writeFieldEnd();
+};
+
+THeaderProtocol.prototype.writeFieldStop = function() {
+ return this.protocol.writeFieldStop();
+};
+
+THeaderProtocol.prototype.writeMapBegin = function(ktype, vtype, size) {
+ return this.protocol.writeMapBegin(ktype, vtype, size);
+};
+
+THeaderProtocol.prototype.writeMapEnd = function() {
+ return this.protocol.writeMapEnd();
+};
+
+THeaderProtocol.prototype.writeListBegin = function(etype, size) {
+ return this.protocol.writeListBegin(etype, size);
+};
+
+THeaderProtocol.prototype.writeListEnd = function() {
+ return this.protocol.writeListEnd();
+};
+
+THeaderProtocol.prototype.writeSetBegin = function(etype, size) {
+ return this.protocol.writeSetBegin(etype, size);
+};
+
+THeaderProtocol.prototype.writeSetEnd = function() {
+ return this.protocol.writeSetEnd();
+};
+
+THeaderProtocol.prototype.writeBool = function(b) {
+ return this.protocol.writeBool(b);
+};
+
+THeaderProtocol.prototype.writeByte = function(b) {
+ return this.protocol.writeByte(b);
+};
+
+THeaderProtocol.prototype.writeI16 = function(i16) {
+ return this.protocol.writeI16(i16);
+};
+
+THeaderProtocol.prototype.writeI32 = function(i32) {
+ return this.protocol.writeI32(i32);
+};
+
+THeaderProtocol.prototype.writeI64 = function(i64) {
+ return this.protocol.writeI64(i64);
+};
+
+THeaderProtocol.prototype.writeDouble = function(dub) {
+ return this.protocol.writeDouble(dub);
+};
+
+THeaderProtocol.prototype.writeStringOrBinary = function(name, encoding, arg) {
+ return this.protocol.writeStringOrBinary(name, encoding, arg);
+};
+
+THeaderProtocol.prototype.writeString = function(arg) {
+ return this.protocol.writeString(arg);
+};
+
+THeaderProtocol.prototype.writeBinary = function(arg) {
+ return this.protocol.writeBinary(arg);
+};
+
+THeaderProtocol.prototype.readMessageBegin = function() {
+ this.trans.readHeaders();
+ this.setProtocol();
+ return this.protocol.readMessageBegin();
+};
+
+THeaderProtocol.prototype.readMessageEnd = function() {
+ return this.protocol.readMessageEnd();
+};
+
+THeaderProtocol.prototype.readStructBegin = function() {
+ return this.protocol.readStructBegin();
+};
+
+THeaderProtocol.prototype.readStructEnd = function() {
+ return this.protocol.readStructEnd();
+};
+
+THeaderProtocol.prototype.readFieldBegin = function() {
+ return this.protocol.readFieldBegin();
+};
+
+THeaderProtocol.prototype.readFieldEnd = function() {
+ return this.protocol.readFieldEnd();
+};
+
+THeaderProtocol.prototype.readMapBegin = function() {
+ return this.protocol.readMapBegin();
+};
+
+THeaderProtocol.prototype.readMapEnd = function() {
+ return this.protocol.readMapEnd();
+};
+
+THeaderProtocol.prototype.readListBegin = function() {
+ return this.protocol.readListBegin();
+};
+
+THeaderProtocol.prototype.readListEnd = function() {
+ return this.protocol.readListEnd();
+};
+
+THeaderProtocol.prototype.readSetBegin = function() {
+ return this.protocol.readSetBegin();
+};
+
+THeaderProtocol.prototype.readSetEnd = function() {
+ return this.protocol.readSetEnd();
+};
+
+THeaderProtocol.prototype.readBool = function() {
+ return this.protocol.readBool();
+};
+
+THeaderProtocol.prototype.readByte = function() {
+ return this.protocol.readByte();
+};
+
+THeaderProtocol.prototype.readI16 = function() {
+ return this.protocol.readI16();
+};
+
+THeaderProtocol.prototype.readI32 = function() {
+ return this.protocol.readI32();
+};
+
+THeaderProtocol.prototype.readI64 = function() {
+ return this.protocol.readI64();
+};
+
+THeaderProtocol.prototype.readDouble = function() {
+ return this.protocol.readDouble();
+};
+
+THeaderProtocol.prototype.readBinary = function() {
+ return this.protocol.readBinary();
+};
+
+THeaderProtocol.prototype.readString = function() {
+ return this.protocol.readString();
+};
+
+THeaderProtocol.prototype.getTransport = function() {
+ return this.trans;
+};
+
+THeaderProtocol.prototype.skip = function(type) {
+ return this.protocol.skip(type);
+};
+
+THeaderProtocol.prototype.setProtocol = function(subProtocolId) {
+ var subProtocolId = this.trans.getProtocolId();
+ if (!ProtocolMap[subProtocolId]) {
+ throw new THeaderProtocolError('Headers not supported for protocol ' + subProtocolId);
+ }
+
+ this.protocol = new ProtocolMap[subProtocolId](this.trans);
+};
diff --git a/lib/nodejs/lib/thrift/header_transport.js b/lib/nodejs/lib/thrift/header_transport.js
new file mode 100644
index 0000000..c5f133e
--- /dev/null
+++ b/lib/nodejs/lib/thrift/header_transport.js
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+var util = require('util');
+var TCompactProtocol = require('./compact_protocol');
+var TBinaryProtocol = require('./binary_protocol');
+var InputBufferUnderrunError = require('./input_buffer_underrun_error');
+
+function THeaderTransportError(message) {
+ Error.call(this);
+ Error.captureStackTrace(this, this.constructor);
+ this.name = this.constructor.name;
+ this.message = message;
+}
+
+util.inherits(THeaderTransportError, Error);
+
+module.exports = THeaderTransport;
+
+// from HeaderFormat.md
+var COMPACT_PROTOCOL_OFFSET = 0;
+var COMPACT_PROTOCOL_VERSION_OFFSET = 1;
+var FRAME_SIZE_OFFSET = 0;
+var HEADER_MAGIC_OFFSET = 32 / 8;
+var FLAGS_OFFSET = 48 / 8;
+var SEQID_OFFSET = 64 / 8;
+var HEADER_SIZE_OFFSET = 96 / 8;
+var HEADER_START_OFFSET = 112 / 8;
+
+var HEADER_MAGIC = 0x0FFF;
+
+var TINFO_HEADER_KEY_VALUE_TYPE = 0x01;
+var MAX_FRAME_SIZE = 0x3FFFFFFF;
+
+ // A helper class for reading/writing varints. Uses
+ // TCompactProtocol under the hood
+function VarintHelper(readBuffer) {
+ var TBufferedTransport = require('./buffered_transport');
+ this.outputBuffer = null;
+ var _this = this;
+ this.transport = new TBufferedTransport(null, function(output) {
+ _this.outputBuffer = output;
+ });
+
+ this.transport.inBuf = readBuffer || Buffer.alloc(0);
+ this.transport.writeCursor = this.transport.inBuf.length;
+ this.protocol = new TCompactProtocol(this.transport);
+};
+
+VarintHelper.prototype.readVarint32 = function() {
+ return this.protocol.readVarint32();
+};
+
+VarintHelper.prototype.writeVarint32 = function(i) {
+ this.protocol.writeVarint32(i);
+};
+
+VarintHelper.prototype.readString = function() {
+ return this.protocol.readString();
+};
+
+VarintHelper.prototype.writeString = function(str) {
+ this.protocol.writeString(str);
+}
+
+VarintHelper.prototype.getOutCount = function() {
+ return this.transport.outCount;
+};
+
+VarintHelper.prototype.write = function(str) {
+ this.transport.write(str);
+};
+
+VarintHelper.prototype.toBuffer = function() {
+ this.transport.flush();
+ return this.outputBuffer;
+};
+
+// from lib/cpp/src/thrift/protocol/TProtocolTypes.h
+THeaderTransport.SubprotocolId = {
+ BINARY: 0,
+ JSON: 1,
+ COMPACT: 2,
+};
+
+/**
+ An abstract transport used as a prototype for other transports
+ to enable reading/writing theaders. This should NOT be used as a standalone transport
+ The methods in this transport are called by THeaderProtocol, which will call readHeaders/writeHeaders
+ in the read/writeMessageBegin methods and parse/write headers to/from a request
+ prior to reading/writing.
+
+ The reason this is not a standalone transport type is because different transport types
+ have their own individual static receiver methods that are called prior to instantiation.
+ There doesn't seem to be a way for THeaderTransport to know which receiver method to use
+ without reworking the server API.
+
+ For reading headers from a request, the parsed headers can be retrieved via
+ getReadHeader. Similarly, you can set headers to be written on the client via
+ setWriteHeader.
+ */
+function THeaderTransport() {
+ this.maxFrameSize = MAX_FRAME_SIZE;
+ this.protocolId = THeaderTransport.SubprotocolId.BINARY;
+ this.rheaders = {};
+ this.wheaders = {};
+ this.inBuf = Buffer.alloc(0);
+ this.outCount = 0;
+ this.flags = null;
+ this.seqid = 0;
+ this.shouldWriteHeaders = true;
+};
+
+var validateHeaders = function(key, value) {
+ if (typeof key !== 'string' || typeof value !== 'string') {
+ throw new THeaderTransportError('Header key and values must be strings');
+ }
+};
+
+var validateProtocolId = function(protocolId) {
+ var protocols = Object.keys(THeaderTransport.SubprotocolId);
+ for (var i = 0; i < protocols.length; i++) {
+ if (protocolId === THeaderTransport.SubprotocolId[protocols[i]]) return true;
+ }
+
+ throw new Error(protocolId + ' is not a valid protocol id');
+};
+
+THeaderTransport.prototype.setSeqId = function(seqid) {
+ this.seqid = seqid;
+};
+
+THeaderTransport.prototype.getSeqId = function(seqid) {
+ return this.seqid;
+};
+
+THeaderTransport.prototype.setFlags = function(flags) {
+ this.flags = flags;
+};
+
+THeaderTransport.prototype.getReadHeaders = function() {
+ return this.rheaders;
+};
+
+THeaderTransport.prototype.setReadHeader = function(key, value) {
+ validateHeaders(key, value);
+ this.rheaders[key] = value;
+};
+
+THeaderTransport.prototype.clearReadHeaders = function() {
+ this.rheaders = {};
+};
+
+THeaderTransport.prototype.getWriteHeaders = function() {
+ return this.wheaders;
+};
+
+THeaderTransport.prototype.setWriteHeader = function(key, value) {
+ validateHeaders(key, value);
+ this.wheaders[key] = value;
+};
+
+THeaderTransport.prototype.clearWriteHeaders = function() {
+ this.wheaders = {};
+};
+
+THeaderTransport.prototype.setMaxFrameSize = function(frameSize) {
+ this.maxFrameSize = frameSize;
+};
+
+THeaderTransport.prototype.setProtocolId = function(protocolId) {
+ validateProtocolId(protocolId);
+ this.protocolId = protocolId;
+};
+
+THeaderTransport.prototype.getProtocolId = function() {
+ return this.protocolId;
+};
+
+var isUnframedBinary = function(readBuffer) {
+ var version = readBuffer.readInt32BE();
+ return (version & TBinaryProtocol.VERSION_MASK) === TBinaryProtocol.VERSION_1;
+}
+
+var isUnframedCompact = function(readBuffer) {
+ var protocolId = readBuffer.readInt8(COMPACT_PROTOCOL_OFFSET);
+ var version = readBuffer.readInt8(COMPACT_PROTOCOL_VERSION_OFFSET);
+ return protocolId === TCompactProtocol.PROTOCOL_ID &&
+ (version & TCompactProtocol.VERSION_MASK) === TCompactProtocol.VERSION_N;
+}
+
+THeaderTransport.prototype.readHeaders = function() {
+ var readBuffer = this.inBuf;
+
+ var isUnframed = false;
+ if (isUnframedBinary(readBuffer)) {
+ this.setProtocolId(THeaderTransport.SubprotocolId.BINARY);
+ isUnframed = true;
+ }
+
+ if (isUnframedCompact(readBuffer)) {
+ this.setProtocolId(THeaderTransport.SubprotocolId.COMPACT);
+ isUnframed = true;
+ }
+
+ if (isUnframed) {
+ this.shouldWriteHeaders = false;
+ return;
+ }
+
+ var frameSize = readBuffer.readInt32BE(FRAME_SIZE_OFFSET);
+ if (frameSize > this.maxFrameSize) {
+ throw new THeaderTransportError('Frame exceeds maximum frame size');
+ }
+
+ var headerMagic = readBuffer.readInt16BE(HEADER_MAGIC_OFFSET);
+ this.shouldWriteHeaders = headerMagic === HEADER_MAGIC;
+ if (!this.shouldWriteHeaders) {
+ return;
+ }
+
+ this.setFlags(readBuffer.readInt16BE(FLAGS_OFFSET));
+ this.setSeqId(readBuffer.readInt32BE(SEQID_OFFSET));
+ var headerSize = readBuffer.readInt16BE(HEADER_SIZE_OFFSET) * 4;
+ var endOfHeaders = HEADER_START_OFFSET + headerSize;
+ if (endOfHeaders > readBuffer.length) {
+ throw new THeaderTransportError('Header size is greater than frame size');
+ }
+
+ var headerBuffer = Buffer.alloc(headerSize);
+ readBuffer.copy(headerBuffer, 0, HEADER_START_OFFSET, endOfHeaders);
+
+ var varintHelper = new VarintHelper(headerBuffer);
+ this.setProtocolId(varintHelper.readVarint32());
+ var transformCount = varintHelper.readVarint32();
+ if (transformCount > 0) {
+ throw new THeaderTransportError('Transforms are not yet supported');
+ }
+
+ while (true) {
+ try {
+ var headerType = varintHelper.readVarint32();
+ if (headerType !== TINFO_HEADER_KEY_VALUE_TYPE) {
+ break;
+ }
+
+ var numberOfHeaders = varintHelper.readVarint32();
+ for (var i = 0; i < numberOfHeaders; i++) {
+ var key = varintHelper.readString();
+ var value = varintHelper.readString();
+ this.setReadHeader(key, value);
+ }
+ } catch (e) {
+ if (e instanceof InputBufferUnderrunError) {
+ break;
+ }
+ throw e;
+ }
+ }
+
+ // moves the read cursor past the headers
+ this.read(endOfHeaders);
+ return this.getReadHeaders();
+};
+
+THeaderTransport.prototype.writeHeaders = function() {
+ // only write headers on the server if the client contained headers
+ if (!this.shouldWriteHeaders) {
+ return;
+ }
+ var headers = this.getWriteHeaders();
+
+ var varintWriter = new VarintHelper();
+ varintWriter.writeVarint32(this.protocolId);
+ varintWriter.writeVarint32(0); // transforms not supported
+
+ // writing info header key values
+ var headerKeys = Object.keys(headers);
+ if (headerKeys.length > 0) {
+ varintWriter.writeVarint32(TINFO_HEADER_KEY_VALUE_TYPE);
+ varintWriter.writeVarint32(headerKeys.length);
+ for (var i = 0; i < headerKeys.length; i++) {
+ var key = headerKeys[i];
+ var value = headers[key];
+
+ varintWriter.writeString(key);
+ varintWriter.writeString(value);
+ }
+ }
+ var headerSizeWithoutPadding = varintWriter.getOutCount();
+ var paddingNeeded = (4 - (headerSizeWithoutPadding % 4)) % 4;
+
+ var headerSize = Buffer.alloc(2);
+ headerSize.writeInt16BE(Math.floor((headerSizeWithoutPadding + paddingNeeded) / 4));
+
+ var paddingBuffer = Buffer.alloc(paddingNeeded);
+ paddingBuffer.fill(0x00);
+ varintWriter.write(paddingBuffer);
+ var headerContentBuffer = varintWriter.toBuffer();
+ var frameSize = Buffer.alloc(4);
+ frameSize.writeInt32BE(10 + this.outCount + headerContentBuffer.length);
+ var headerMagic = Buffer.alloc(2);
+ headerMagic.writeInt16BE(HEADER_MAGIC);
+
+ // flags are not yet supported, so write a zero
+ var flags = Buffer.alloc(2);
+ flags.writeInt16BE(0);
+
+ var seqid = Buffer.alloc(4);
+ seqid.writeInt32BE(this.getSeqId());
+
+ var headerBuffer = Buffer.concat([
+ frameSize,
+ headerMagic,
+ flags,
+ seqid,
+ headerSize,
+ headerContentBuffer,
+ ]);
+
+ this.outBuffers.unshift(headerBuffer);
+ this.outCount += headerBuffer.length;
+};
diff --git a/lib/nodejs/lib/thrift/index.js b/lib/nodejs/lib/thrift/index.js
index b09953d..0a2d02b 100644
--- a/lib/nodejs/lib/thrift/index.js
+++ b/lib/nodejs/lib/thrift/index.js
@@ -72,3 +72,4 @@
exports.TBinaryProtocol = require('./binary_protocol');
exports.TJSONProtocol = require('./json_protocol');
exports.TCompactProtocol = require('./compact_protocol');
+exports.THeaderProtocol = require('./header_protocol');
diff --git a/lib/nodejs/lib/thrift/server.js b/lib/nodejs/lib/thrift/server.js
index e124acc..16b74ea 100644
--- a/lib/nodejs/lib/thrift/server.js
+++ b/lib/nodejs/lib/thrift/server.js
@@ -23,6 +23,7 @@
var TBufferedTransport = require('./buffered_transport');
var TBinaryProtocol = require('./binary_protocol');
+var THeaderProtocol = require('./header_protocol');
var InputBufferUnderrunError = require('./input_buffer_underrun_error');
/**
@@ -43,14 +44,23 @@
});
stream.on('data', transport.receiver(function(transportWithData) {
var input = new protocol(transportWithData);
- var output = new protocol(new transport(undefined, function(buf) {
+ var outputCb = function(buf) {
try {
stream.write(buf);
} catch (err) {
self.emit('error', err);
stream.end();
}
- }));
+ };
+
+ var output = new protocol(new transport(undefined, outputCb));
+ // Read and write need to be performed on the same transport
+ // for THeaderProtocol because we should only respond with
+ // headers if the request contains headers
+ if (protocol === THeaderProtocol) {
+ output = input;
+ output.trans.onFlush = outputCb;
+ }
try {
do {
diff --git a/lib/nodejs/test/header.test.js b/lib/nodejs/test/header.test.js
new file mode 100644
index 0000000..efd7f81
--- /dev/null
+++ b/lib/nodejs/test/header.test.js
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+const TFramedTransport = require("../lib/thrift/framed_transport");
+const THeaderTransport = require("../lib/thrift/header_transport");
+const THeaderProtocol = require("../lib/thrift/header_protocol");
+const thrift = require("../lib/thrift");
+const fs = require("fs");
+const test = require("tape");
+const path = require("path");
+
+const headerPayload = fs.readFileSync(
+ path.join(__dirname, "test_header_payload")
+);
+
+const cases = {
+ "Should read headers from payload": function(assert) {
+ const transport = new TFramedTransport();
+ transport.inBuf = Buffer.from(headerPayload);
+
+ const headers = transport.readHeaders();
+ assert.equals(headers.Parent, "shoobar");
+ assert.equals(headers.Trace, "abcde");
+ assert.end();
+ },
+ "Should read headers when reading message begin": function(assert) {
+ const transport = new TFramedTransport();
+ transport.inBuf = Buffer.from(headerPayload);
+ const protocol = new THeaderProtocol(transport);
+ const result = protocol.readMessageBegin();
+
+ const headers = transport.getReadHeaders();
+ assert.equals(headers.Parent, "shoobar");
+ assert.equals(headers.Trace, "abcde");
+ assert.equals(result.fname, "add");
+ assert.equals(result.mtype, thrift.Thrift.MessageType.CALL);
+ assert.end();
+ },
+ "Should be able to write headers": function(assert) {
+ const writeTransport = new TFramedTransport();
+ writeTransport.setProtocolId(THeaderTransport.SubprotocolId.BINARY);
+ writeTransport.setWriteHeader("Hihihihi", "hohohoho");
+ writeTransport.setWriteHeader("boobooboo", "fooshoopoo");
+ writeTransport.setWriteHeader("a", "z");
+ writeTransport.writeHeaders();
+ const writeBuffer = writeTransport.outBuffers[0];
+
+ const readTransport = new TFramedTransport();
+ readTransport.inBuf = writeBuffer;
+ readTransport.readHeaders();
+
+ const headers = readTransport.getReadHeaders();
+ assert.equals(headers.Hihihihi, "hohohoho");
+ assert.equals(headers.boobooboo, "fooshoopoo");
+ assert.equals(headers.a, "z");
+ assert.end();
+ }
+};
+
+Object.keys(cases).forEach(function(caseName) {
+ test(caseName, cases[caseName]);
+});
diff --git a/lib/nodejs/test/helpers.js b/lib/nodejs/test/helpers.js
index 72d128d..f3c27b3 100644
--- a/lib/nodejs/test/helpers.js
+++ b/lib/nodejs/test/helpers.js
@@ -28,7 +28,8 @@
module.exports.protocols = {
json: thrift.TJSONProtocol,
binary: thrift.TBinaryProtocol,
- compact: thrift.TCompactProtocol
+ compact: thrift.TCompactProtocol,
+ header: thrift.THeaderProtocol
};
module.exports.ecmaMode = process.argv.includes("--es6") ? "es6" : "es5";
diff --git a/lib/nodejs/test/test_header_payload b/lib/nodejs/test/test_header_payload
new file mode 100644
index 0000000..22d5ef7
--- /dev/null
+++ b/lib/nodejs/test/test_header_payload
Binary files differ