THRIFT-1261 STDIO support for node-thrift
Patch: Jordan Shaw
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1183221 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/nodejs/lib/thrift/connection.js b/lib/nodejs/lib/thrift/connection.js
index 3530227..b7f9b7a 100644
--- a/lib/nodejs/lib/thrift/connection.js
+++ b/lib/nodejs/lib/thrift/connection.js
@@ -128,3 +128,115 @@
return client;
}
+
+var child_process = require('child_process');
+var StdIOConnection = exports.StdIOConnection = function(command, options) {
+ var command_parts = command.split(' ');
+ command = command_parts[0];
+ var args = command_parts.splice(1,command_parts.length -1);
+ var child = this.child = child_process.spawn(command,args);
+
+ var self = this;
+ EventEmitter.call(this);
+
+ this._debug = options.debug || false;
+ this.connection = child.stdin;
+ this.options = options || {};
+ this.transport = this.options.transport || ttransport.TFramedTransport;
+ this.protocol = this.options.protocol || tprotocol.TBinaryProtocol;
+ this.offline_queue = [];
+
+ if(this._debug === true){
+
+ this.child.stderr.on('data',function(err){
+ console.log(err.toString(),'CHILD ERROR');
+
+ });
+
+ this.child.on('exit',function(code,signal){
+ console.log(code+':'+signal,'CHILD EXITED');
+
+ });
+
+ }
+
+ this.frameLeft = 0;
+ this.framePos = 0;
+ this.frame = null;
+ this.connected = true;
+
+ self.offline_queue.forEach(function(data) {
+ self.connection.write(data);
+ });
+
+
+ this.connection.addListener("error", function(err) {
+ self.emit("error", err);
+ });
+
+ // Add a close listener
+ this.connection.addListener("close", function() {
+ self.emit("close");
+ });
+
+ child.stdout.addListener("data", self.transport.receiver(function(transport_with_data) {
+ var message = new self.protocol(transport_with_data);
+ try {
+ var header = message.readMessageBegin();
+ var dummy_seqid = header.rseqid * -1;
+ var client = self.client;
+ client._reqs[dummy_seqid] = function(err, success){
+ transport_with_data.commitPosition();
+
+ var callback = client._reqs[header.rseqid];
+ delete client._reqs[header.rseqid];
+ if (callback) {
+ callback(err, success);
+ }
+ };
+ client['recv_' + header.fname](message, header.mtype, dummy_seqid);
+ }
+ catch (e) {
+ if (e instanceof ttransport.InputBufferUnderrunError) {
+ transport_with_data.rollbackPosition();
+ }
+ else {
+ throw e;
+ }
+ }
+ }));
+
+};
+
+sys.inherits(StdIOConnection, EventEmitter);
+
+StdIOConnection.prototype.end = function() {
+ this.connection.end();
+}
+
+StdIOConnection.prototype.write = function(data) {
+ if (!this.connected) {
+ this.offline_queue.push(data);
+ return;
+ }
+ this.connection.write(data);
+}
+exports.createStdIOConnection = function(command,options){
+ return new StdIOConnection(command,options);
+
+};
+
+exports.createStdIOClient = function(cls,connection) {
+ if (cls.Client) {
+ cls = cls.Client;
+ }
+
+ var client = new cls(new connection.transport(undefined, function(buf) {
+ connection.write(buf);
+ }), connection.protocol);
+
+ // TODO clean this up
+ connection.client = client;
+
+ return client;
+}
diff --git a/lib/nodejs/lib/thrift/index.js b/lib/nodejs/lib/thrift/index.js
index f4fa3cd..c7cedc0 100644
--- a/lib/nodejs/lib/thrift/index.js
+++ b/lib/nodejs/lib/thrift/index.js
@@ -22,5 +22,7 @@
exports.Connection = connection.Connection;
exports.createClient = connection.createClient;
exports.createConnection = connection.createConnection;
+exports.createStdIOClient = connection.createStdIOClient;
+exports.createStdIOConnection = connection.createStdIOConnection;
exports.createServer = require('./server').createServer;