[THRIFT-4771] add nodejs THeaderProtocol support (#1743)

Client: nodejs
diff --git a/lib/nodejs/lib/thrift/binary_protocol.js b/lib/nodejs/lib/thrift/binary_protocol.js
index 6ab9c05..af8836c 100644
--- a/lib/nodejs/lib/thrift/binary_protocol.js
+++ b/lib/nodejs/lib/thrift/binary_protocol.js
@@ -33,6 +33,10 @@
     VERSION_1 = -2147418112, // 0x80010000
     TYPE_MASK = 0x000000ff;
 
+TBinaryProtocol.VERSION_MASK = VERSION_MASK;
+TBinaryProtocol.VERSION_1 = VERSION_1;
+TBinaryProtocol.TYPE_MASK = TYPE_MASK
+
 function TBinaryProtocol(trans, strictRead, strictWrite) {
   this.trans = trans;
   this.strictRead = (strictRead !== undefined ? strictRead : false);
diff --git a/lib/nodejs/lib/thrift/buffered_transport.js b/lib/nodejs/lib/thrift/buffered_transport.js
index a9e006e..113e216 100644
--- a/lib/nodejs/lib/thrift/buffered_transport.js
+++ b/lib/nodejs/lib/thrift/buffered_transport.js
@@ -19,6 +19,7 @@
 
 var binary = require('./binary');
 var InputBufferUnderrunError = require('./input_buffer_underrun_error');
+var THeaderTransport = require('./header_transport');
 
 module.exports = TBufferedTransport;
 
@@ -33,6 +34,8 @@
   this.onFlush = callback;
 };
 
+TBufferedTransport.prototype = new THeaderTransport();
+
 TBufferedTransport.prototype.reset = function() {
   this.inBuf = new Buffer(this.defaultReadBufferSize);
   this.readCursor = 0;
diff --git a/lib/nodejs/lib/thrift/framed_transport.js b/lib/nodejs/lib/thrift/framed_transport.js
index 6947925..f7daa3f 100644
--- a/lib/nodejs/lib/thrift/framed_transport.js
+++ b/lib/nodejs/lib/thrift/framed_transport.js
@@ -19,6 +19,7 @@
 
 var binary = require('./binary');
 var InputBufferUnderrunError = require('./input_buffer_underrun_error');
+var THeaderTransport = require('./header_transport');
 
 module.exports = TFramedTransport;
 
@@ -30,6 +31,8 @@
   this.onFlush = callback;
 };
 
+TFramedTransport.prototype = new THeaderTransport();
+
 TFramedTransport.receiver = function(callback, seqid) {
   var residual = null;
 
diff --git a/lib/nodejs/lib/thrift/header_protocol.js b/lib/nodejs/lib/thrift/header_protocol.js
new file mode 100644
index 0000000..0c3b0db
--- /dev/null
+++ b/lib/nodejs/lib/thrift/header_protocol.js
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+var util = require('util');
+var TBinaryProtocol = require('./binary_protocol');
+var TCompactProtocol = require('./compact_protocol');
+var THeaderTransport = require('./header_transport');
+
+var ProtocolMap = {};
+ProtocolMap[THeaderTransport.SubprotocolId.BINARY] = TBinaryProtocol;
+ProtocolMap[THeaderTransport.SubprotocolId.COMPACT] = TCompactProtocol;
+
+module.exports = THeaderProtocol;
+
+function THeaderProtocolError(message) {
+  Error.call(this);
+  Error.captureStackTrace(this, this.constructor);
+  this.name = this.constructor.name;
+  this.message = message;
+}
+
+util.inherits(THeaderProtocolError, Error);
+
+/**
+ * A framed protocol with headers.
+ *
+ * THeaderProtocol frames other Thrift protocols and adds support for
+ * optional out-of-band headers. The currently supported subprotocols are
+ * TBinaryProtocol and TCompactProtocol. It can currently only be used with
+ * transports that inherit THeaderTransport.
+ *
+ * THeaderProtocol does not currently support THTTPServer, TNonblockingServer,
+ * or TProcessPoolServer.
+ *
+ * See doc/specs/HeaderFormat.md for details of the wire format.
+ */
+function THeaderProtocol(trans) {
+  if (!(trans instanceof THeaderTransport)) {
+    throw new THeaderProtocolError(
+      'Only transports that inherit THeaderTransport can be' +
+      ' used with THeaderProtocol'
+    );
+  }
+  this.trans = trans;
+  this.setProtocol();
+};
+
+THeaderProtocol.prototype.flush = function() {
+   // Headers must be written prior to flushing because because
+   // you need to calculate the length of the payload for the length
+   // field of the header
+  this.trans.writeHeaders();
+  return this.trans.flush();
+};
+
+THeaderProtocol.prototype.writeMessageBegin = function(name, type, seqid) {
+  return this.protocol.writeMessageBegin(name, type, seqid);
+};
+
+THeaderProtocol.prototype.writeMessageEnd = function() {
+  return this.protocol.writeMessageEnd();
+};
+
+THeaderProtocol.prototype.writeStructBegin = function(name) {
+  return this.protocol.writeStructBegin(name);
+};
+
+THeaderProtocol.prototype.writeStructEnd = function() {
+  return this.protocol.writeStructEnd();
+};
+
+THeaderProtocol.prototype.writeFieldBegin = function(name, type, id) {
+  return this.protocol.writeFieldBegin(name, type, id);
+}
+
+THeaderProtocol.prototype.writeFieldEnd = function() {
+  return this.protocol.writeFieldEnd();
+};
+
+THeaderProtocol.prototype.writeFieldStop = function() {
+  return this.protocol.writeFieldStop();
+};
+
+THeaderProtocol.prototype.writeMapBegin = function(ktype, vtype, size) {
+  return this.protocol.writeMapBegin(ktype, vtype, size);
+};
+
+THeaderProtocol.prototype.writeMapEnd = function() {
+  return this.protocol.writeMapEnd();
+};
+
+THeaderProtocol.prototype.writeListBegin = function(etype, size) {
+  return this.protocol.writeListBegin(etype, size);
+};
+
+THeaderProtocol.prototype.writeListEnd = function() {
+  return this.protocol.writeListEnd();
+};
+
+THeaderProtocol.prototype.writeSetBegin = function(etype, size) {
+  return this.protocol.writeSetBegin(etype, size);
+};
+
+THeaderProtocol.prototype.writeSetEnd = function() {
+  return this.protocol.writeSetEnd();
+};
+
+THeaderProtocol.prototype.writeBool = function(b) {
+  return this.protocol.writeBool(b);
+};
+
+THeaderProtocol.prototype.writeByte = function(b) {
+  return this.protocol.writeByte(b);
+};
+
+THeaderProtocol.prototype.writeI16 = function(i16) {
+  return this.protocol.writeI16(i16);
+};
+
+THeaderProtocol.prototype.writeI32 = function(i32) {
+  return this.protocol.writeI32(i32);
+};
+
+THeaderProtocol.prototype.writeI64 = function(i64) {
+  return this.protocol.writeI64(i64);
+};
+
+THeaderProtocol.prototype.writeDouble = function(dub) {
+  return this.protocol.writeDouble(dub);
+};
+
+THeaderProtocol.prototype.writeStringOrBinary = function(name, encoding, arg) {
+  return this.protocol.writeStringOrBinary(name, encoding, arg);
+};
+
+THeaderProtocol.prototype.writeString = function(arg) {
+  return this.protocol.writeString(arg);
+};
+
+THeaderProtocol.prototype.writeBinary = function(arg) {
+  return this.protocol.writeBinary(arg);
+};
+
+THeaderProtocol.prototype.readMessageBegin = function() {
+  this.trans.readHeaders();
+  this.setProtocol();
+  return this.protocol.readMessageBegin();
+};
+
+THeaderProtocol.prototype.readMessageEnd = function() {
+  return this.protocol.readMessageEnd();
+};
+
+THeaderProtocol.prototype.readStructBegin = function() {
+  return this.protocol.readStructBegin();
+};
+
+THeaderProtocol.prototype.readStructEnd = function() {
+  return this.protocol.readStructEnd();
+};
+
+THeaderProtocol.prototype.readFieldBegin = function() {
+  return this.protocol.readFieldBegin();
+};
+
+THeaderProtocol.prototype.readFieldEnd = function() {
+  return this.protocol.readFieldEnd();
+};
+
+THeaderProtocol.prototype.readMapBegin = function() {
+  return this.protocol.readMapBegin();
+};
+
+THeaderProtocol.prototype.readMapEnd = function() {
+  return this.protocol.readMapEnd();
+};
+
+THeaderProtocol.prototype.readListBegin = function() {
+  return this.protocol.readListBegin();
+};
+
+THeaderProtocol.prototype.readListEnd = function() {
+  return this.protocol.readListEnd();
+};
+
+THeaderProtocol.prototype.readSetBegin = function() {
+  return this.protocol.readSetBegin();
+};
+
+THeaderProtocol.prototype.readSetEnd = function() {
+  return this.protocol.readSetEnd();
+};
+
+THeaderProtocol.prototype.readBool = function() {
+  return this.protocol.readBool();
+};
+
+THeaderProtocol.prototype.readByte = function() {
+  return this.protocol.readByte();
+};
+
+THeaderProtocol.prototype.readI16 = function() {
+  return this.protocol.readI16();
+};
+
+THeaderProtocol.prototype.readI32 = function() {
+  return this.protocol.readI32();
+};
+
+THeaderProtocol.prototype.readI64 = function() {
+  return this.protocol.readI64();
+};
+
+THeaderProtocol.prototype.readDouble = function() {
+  return this.protocol.readDouble();
+};
+
+THeaderProtocol.prototype.readBinary = function() {
+  return this.protocol.readBinary();
+};
+
+THeaderProtocol.prototype.readString = function() {
+  return this.protocol.readString();
+};
+
+THeaderProtocol.prototype.getTransport = function() {
+  return this.trans;
+};
+
+THeaderProtocol.prototype.skip = function(type) {
+  return this.protocol.skip(type);
+};
+
+THeaderProtocol.prototype.setProtocol = function(subProtocolId) {
+  var subProtocolId = this.trans.getProtocolId();
+  if (!ProtocolMap[subProtocolId]) {
+    throw new THeaderProtocolError('Headers not supported for protocol ' + subProtocolId);
+  }
+
+  this.protocol = new ProtocolMap[subProtocolId](this.trans);
+};
diff --git a/lib/nodejs/lib/thrift/header_transport.js b/lib/nodejs/lib/thrift/header_transport.js
new file mode 100644
index 0000000..c5f133e
--- /dev/null
+++ b/lib/nodejs/lib/thrift/header_transport.js
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+var util = require('util');
+var TCompactProtocol = require('./compact_protocol');
+var TBinaryProtocol = require('./binary_protocol');
+var InputBufferUnderrunError = require('./input_buffer_underrun_error');
+
+function THeaderTransportError(message) {
+  Error.call(this);
+  Error.captureStackTrace(this, this.constructor);
+  this.name = this.constructor.name;
+  this.message = message;
+}
+
+util.inherits(THeaderTransportError, Error);
+
+module.exports = THeaderTransport;
+
+// from HeaderFormat.md
+var COMPACT_PROTOCOL_OFFSET = 0;
+var COMPACT_PROTOCOL_VERSION_OFFSET = 1;
+var FRAME_SIZE_OFFSET = 0;
+var HEADER_MAGIC_OFFSET = 32 / 8;
+var FLAGS_OFFSET = 48 / 8;
+var SEQID_OFFSET = 64 / 8;
+var HEADER_SIZE_OFFSET = 96 / 8;
+var HEADER_START_OFFSET = 112 / 8;
+
+var HEADER_MAGIC = 0x0FFF;
+
+var TINFO_HEADER_KEY_VALUE_TYPE = 0x01;
+var MAX_FRAME_SIZE = 0x3FFFFFFF;
+
+ // A helper class for reading/writing varints. Uses
+ // TCompactProtocol under the hood
+function VarintHelper(readBuffer) {
+  var TBufferedTransport = require('./buffered_transport');
+  this.outputBuffer = null;
+  var _this = this;
+  this.transport = new TBufferedTransport(null, function(output) {
+    _this.outputBuffer = output;
+  });
+
+  this.transport.inBuf = readBuffer || Buffer.alloc(0);
+  this.transport.writeCursor = this.transport.inBuf.length;
+  this.protocol = new TCompactProtocol(this.transport);
+};
+
+VarintHelper.prototype.readVarint32 = function() {
+  return this.protocol.readVarint32();
+};
+
+VarintHelper.prototype.writeVarint32 = function(i) {
+  this.protocol.writeVarint32(i);
+};
+
+VarintHelper.prototype.readString = function() {
+  return this.protocol.readString();
+};
+
+VarintHelper.prototype.writeString = function(str) {
+  this.protocol.writeString(str);
+}
+
+VarintHelper.prototype.getOutCount = function() {
+  return this.transport.outCount;
+};
+
+VarintHelper.prototype.write = function(str) {
+  this.transport.write(str);
+};
+
+VarintHelper.prototype.toBuffer = function() {
+  this.transport.flush();
+  return this.outputBuffer;
+};
+
+// from lib/cpp/src/thrift/protocol/TProtocolTypes.h
+THeaderTransport.SubprotocolId = {
+  BINARY: 0,
+  JSON: 1,
+  COMPACT: 2,
+};
+
+/**
+  An abstract transport used as a prototype for other transports
+  to enable reading/writing theaders. This should NOT be used as a standalone transport
+  The methods in this transport are called by THeaderProtocol, which will call readHeaders/writeHeaders
+  in the read/writeMessageBegin methods and parse/write headers to/from a request
+  prior to reading/writing.
+
+  The reason this is not a standalone transport type is because different transport types
+  have their own individual static receiver methods that are called prior to instantiation.
+  There doesn't seem to be a way for THeaderTransport to know which receiver method to use
+  without reworking the server API.
+
+  For reading headers from a request, the parsed headers can be retrieved via
+  getReadHeader. Similarly, you can set headers to be written on the client via
+  setWriteHeader.
+ */
+function THeaderTransport() {
+  this.maxFrameSize = MAX_FRAME_SIZE;
+  this.protocolId = THeaderTransport.SubprotocolId.BINARY;
+  this.rheaders = {};
+  this.wheaders = {};
+  this.inBuf = Buffer.alloc(0);
+  this.outCount = 0;
+  this.flags = null;
+  this.seqid = 0;
+  this.shouldWriteHeaders = true;
+};
+
+var validateHeaders = function(key, value) {
+  if (typeof key !== 'string' || typeof value !== 'string') {
+    throw new THeaderTransportError('Header key and values must be strings');
+  }
+};
+
+var validateProtocolId = function(protocolId) {
+  var protocols = Object.keys(THeaderTransport.SubprotocolId);
+  for (var i = 0; i < protocols.length; i++) {
+    if (protocolId === THeaderTransport.SubprotocolId[protocols[i]]) return true;
+  }
+
+  throw new Error(protocolId + ' is not a valid protocol id');
+};
+
+THeaderTransport.prototype.setSeqId = function(seqid) {
+  this.seqid = seqid;
+};
+
+THeaderTransport.prototype.getSeqId = function(seqid) {
+  return this.seqid;
+};
+
+THeaderTransport.prototype.setFlags = function(flags) {
+  this.flags = flags;
+};
+
+THeaderTransport.prototype.getReadHeaders = function() {
+  return this.rheaders;
+};
+
+THeaderTransport.prototype.setReadHeader = function(key, value) {
+  validateHeaders(key, value);
+  this.rheaders[key] = value;
+};
+
+THeaderTransport.prototype.clearReadHeaders = function() {
+  this.rheaders = {};
+};
+
+THeaderTransport.prototype.getWriteHeaders = function() {
+  return this.wheaders;
+};
+
+THeaderTransport.prototype.setWriteHeader = function(key, value) {
+  validateHeaders(key, value);
+  this.wheaders[key] = value;
+};
+
+THeaderTransport.prototype.clearWriteHeaders = function() {
+  this.wheaders = {};
+};
+
+THeaderTransport.prototype.setMaxFrameSize = function(frameSize) {
+  this.maxFrameSize = frameSize;
+};
+
+THeaderTransport.prototype.setProtocolId = function(protocolId) {
+  validateProtocolId(protocolId);
+  this.protocolId = protocolId;
+};
+
+THeaderTransport.prototype.getProtocolId = function() {
+  return this.protocolId;
+};
+
+var isUnframedBinary = function(readBuffer) {
+  var version = readBuffer.readInt32BE();
+  return (version & TBinaryProtocol.VERSION_MASK) === TBinaryProtocol.VERSION_1;
+}
+
+var isUnframedCompact = function(readBuffer) {
+  var protocolId = readBuffer.readInt8(COMPACT_PROTOCOL_OFFSET);
+  var version = readBuffer.readInt8(COMPACT_PROTOCOL_VERSION_OFFSET);
+  return protocolId === TCompactProtocol.PROTOCOL_ID &&
+    (version & TCompactProtocol.VERSION_MASK) === TCompactProtocol.VERSION_N;
+}
+
+THeaderTransport.prototype.readHeaders = function() {
+  var readBuffer = this.inBuf;
+
+  var isUnframed = false;
+  if (isUnframedBinary(readBuffer)) {
+    this.setProtocolId(THeaderTransport.SubprotocolId.BINARY);
+    isUnframed = true;
+  }
+
+  if (isUnframedCompact(readBuffer)) {
+    this.setProtocolId(THeaderTransport.SubprotocolId.COMPACT);
+    isUnframed = true;
+  }
+
+  if (isUnframed) {
+    this.shouldWriteHeaders = false;
+    return;
+  }
+
+  var frameSize = readBuffer.readInt32BE(FRAME_SIZE_OFFSET);
+  if (frameSize > this.maxFrameSize) {
+    throw new THeaderTransportError('Frame exceeds maximum frame size');
+  }
+
+  var headerMagic = readBuffer.readInt16BE(HEADER_MAGIC_OFFSET);
+  this.shouldWriteHeaders = headerMagic === HEADER_MAGIC;
+  if (!this.shouldWriteHeaders) {
+    return;
+  }
+
+  this.setFlags(readBuffer.readInt16BE(FLAGS_OFFSET));
+  this.setSeqId(readBuffer.readInt32BE(SEQID_OFFSET));
+  var headerSize = readBuffer.readInt16BE(HEADER_SIZE_OFFSET) * 4;
+  var endOfHeaders = HEADER_START_OFFSET + headerSize;
+  if (endOfHeaders > readBuffer.length) {
+    throw new THeaderTransportError('Header size is greater than frame size');
+  }
+
+  var headerBuffer = Buffer.alloc(headerSize);
+  readBuffer.copy(headerBuffer, 0, HEADER_START_OFFSET, endOfHeaders);
+
+  var varintHelper = new VarintHelper(headerBuffer);
+  this.setProtocolId(varintHelper.readVarint32());
+  var transformCount = varintHelper.readVarint32();
+  if (transformCount > 0) {
+    throw new THeaderTransportError('Transforms are not yet supported');
+  }
+
+  while (true) {
+    try {
+      var headerType = varintHelper.readVarint32();
+      if (headerType !== TINFO_HEADER_KEY_VALUE_TYPE) {
+        break;
+      }
+
+      var numberOfHeaders = varintHelper.readVarint32();
+      for (var i = 0; i < numberOfHeaders; i++) {
+        var key = varintHelper.readString();
+        var value = varintHelper.readString();
+        this.setReadHeader(key, value);
+      }
+    } catch (e) {
+      if (e instanceof InputBufferUnderrunError) {
+        break;
+      }
+      throw e;
+    }
+  }
+
+  // moves the read cursor past the headers
+  this.read(endOfHeaders);
+  return this.getReadHeaders();
+};
+
+THeaderTransport.prototype.writeHeaders = function() {
+  // only write headers on the server if the client contained headers
+  if (!this.shouldWriteHeaders) {
+    return;
+  }
+  var headers = this.getWriteHeaders();
+
+  var varintWriter = new VarintHelper();
+  varintWriter.writeVarint32(this.protocolId);
+  varintWriter.writeVarint32(0); // transforms not supported
+
+  // writing info header key values
+  var headerKeys = Object.keys(headers);
+  if (headerKeys.length > 0) {
+    varintWriter.writeVarint32(TINFO_HEADER_KEY_VALUE_TYPE);
+    varintWriter.writeVarint32(headerKeys.length);
+    for (var i = 0; i < headerKeys.length; i++) {
+      var key = headerKeys[i];
+      var value = headers[key];
+
+      varintWriter.writeString(key);
+      varintWriter.writeString(value);
+    }
+  }
+ var headerSizeWithoutPadding = varintWriter.getOutCount();
+  var paddingNeeded = (4 - (headerSizeWithoutPadding % 4)) % 4;
+
+  var headerSize = Buffer.alloc(2);
+  headerSize.writeInt16BE(Math.floor((headerSizeWithoutPadding + paddingNeeded) / 4));
+
+  var paddingBuffer = Buffer.alloc(paddingNeeded);
+  paddingBuffer.fill(0x00);
+  varintWriter.write(paddingBuffer);
+  var headerContentBuffer = varintWriter.toBuffer();
+  var frameSize = Buffer.alloc(4);
+  frameSize.writeInt32BE(10 + this.outCount + headerContentBuffer.length);
+  var headerMagic = Buffer.alloc(2);
+  headerMagic.writeInt16BE(HEADER_MAGIC);
+
+  // flags are not yet supported, so write a zero
+  var flags = Buffer.alloc(2);
+  flags.writeInt16BE(0);
+
+  var seqid = Buffer.alloc(4);
+  seqid.writeInt32BE(this.getSeqId());
+
+  var headerBuffer = Buffer.concat([
+    frameSize,
+    headerMagic,
+    flags,
+    seqid,
+    headerSize,
+    headerContentBuffer,
+  ]);
+
+  this.outBuffers.unshift(headerBuffer);
+  this.outCount += headerBuffer.length;
+};
diff --git a/lib/nodejs/lib/thrift/index.js b/lib/nodejs/lib/thrift/index.js
index b09953d..0a2d02b 100644
--- a/lib/nodejs/lib/thrift/index.js
+++ b/lib/nodejs/lib/thrift/index.js
@@ -72,3 +72,4 @@
 exports.TBinaryProtocol = require('./binary_protocol');
 exports.TJSONProtocol = require('./json_protocol');
 exports.TCompactProtocol = require('./compact_protocol');
+exports.THeaderProtocol = require('./header_protocol');
diff --git a/lib/nodejs/lib/thrift/server.js b/lib/nodejs/lib/thrift/server.js
index e124acc..16b74ea 100644
--- a/lib/nodejs/lib/thrift/server.js
+++ b/lib/nodejs/lib/thrift/server.js
@@ -23,6 +23,7 @@
 
 var TBufferedTransport = require('./buffered_transport');
 var TBinaryProtocol = require('./binary_protocol');
+var THeaderProtocol = require('./header_protocol');
 var InputBufferUnderrunError = require('./input_buffer_underrun_error');
 
 /**
@@ -43,14 +44,23 @@
     });
     stream.on('data', transport.receiver(function(transportWithData) {
       var input = new protocol(transportWithData);
-      var output = new protocol(new transport(undefined, function(buf) {
+      var outputCb = function(buf) {
         try {
             stream.write(buf);
         } catch (err) {
             self.emit('error', err);
             stream.end();
         }
-      }));
+      };
+
+      var output = new protocol(new transport(undefined, outputCb));
+      // Read and write need to be performed on the same transport
+      // for THeaderProtocol because we should only respond with
+      // headers if the request contains headers
+      if (protocol === THeaderProtocol) {
+        output = input;
+        output.trans.onFlush = outputCb;
+      }
 
       try {
         do {
diff --git a/lib/nodejs/test/header.test.js b/lib/nodejs/test/header.test.js
new file mode 100644
index 0000000..efd7f81
--- /dev/null
+++ b/lib/nodejs/test/header.test.js
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+const TFramedTransport = require("../lib/thrift/framed_transport");
+const THeaderTransport = require("../lib/thrift/header_transport");
+const THeaderProtocol = require("../lib/thrift/header_protocol");
+const thrift = require("../lib/thrift");
+const fs = require("fs");
+const test = require("tape");
+const path = require("path");
+
+const headerPayload = fs.readFileSync(
+  path.join(__dirname, "test_header_payload")
+);
+
+const cases = {
+  "Should read headers from payload": function(assert) {
+    const transport = new TFramedTransport();
+    transport.inBuf = Buffer.from(headerPayload);
+
+    const headers = transport.readHeaders();
+    assert.equals(headers.Parent, "shoobar");
+    assert.equals(headers.Trace, "abcde");
+    assert.end();
+  },
+  "Should read headers when reading message begin": function(assert) {
+    const transport = new TFramedTransport();
+    transport.inBuf = Buffer.from(headerPayload);
+    const protocol = new THeaderProtocol(transport);
+    const result = protocol.readMessageBegin();
+
+    const headers = transport.getReadHeaders();
+    assert.equals(headers.Parent, "shoobar");
+    assert.equals(headers.Trace, "abcde");
+    assert.equals(result.fname, "add");
+    assert.equals(result.mtype, thrift.Thrift.MessageType.CALL);
+    assert.end();
+  },
+  "Should be able to write headers": function(assert) {
+    const writeTransport = new TFramedTransport();
+    writeTransport.setProtocolId(THeaderTransport.SubprotocolId.BINARY);
+    writeTransport.setWriteHeader("Hihihihi", "hohohoho");
+    writeTransport.setWriteHeader("boobooboo", "fooshoopoo");
+    writeTransport.setWriteHeader("a", "z");
+    writeTransport.writeHeaders();
+    const writeBuffer = writeTransport.outBuffers[0];
+
+    const readTransport = new TFramedTransport();
+    readTransport.inBuf = writeBuffer;
+    readTransport.readHeaders();
+
+    const headers = readTransport.getReadHeaders();
+    assert.equals(headers.Hihihihi, "hohohoho");
+    assert.equals(headers.boobooboo, "fooshoopoo");
+    assert.equals(headers.a, "z");
+    assert.end();
+  }
+};
+
+Object.keys(cases).forEach(function(caseName) {
+  test(caseName, cases[caseName]);
+});
diff --git a/lib/nodejs/test/helpers.js b/lib/nodejs/test/helpers.js
index 72d128d..f3c27b3 100644
--- a/lib/nodejs/test/helpers.js
+++ b/lib/nodejs/test/helpers.js
@@ -28,7 +28,8 @@
 module.exports.protocols = {
   json: thrift.TJSONProtocol,
   binary: thrift.TBinaryProtocol,
-  compact: thrift.TCompactProtocol
+  compact: thrift.TCompactProtocol,
+  header: thrift.THeaderProtocol
 };
 
 module.exports.ecmaMode = process.argv.includes("--es6") ? "es6" : "es5";
diff --git a/lib/nodejs/test/test_header_payload b/lib/nodejs/test/test_header_payload
new file mode 100644
index 0000000..22d5ef7
--- /dev/null
+++ b/lib/nodejs/test/test_header_payload
Binary files differ