blob: 05c5a6cf36605121cf45882e6d03bfa52506b22e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
var util = require('util'),
EventEmitter = require("events").EventEmitter,
net = require('net'),
ttransport = require('./transport'),
tprotocol = require('./protocol');
var binary = require('./binary');
var Connection = exports.Connection = function(stream, options) {
var self = this;
EventEmitter.call(this);
this.connection = stream;
this.options = options || {};
this.transport = this.options.transport || ttransport.TFramedTransport;
this.protocol = this.options.protocol || tprotocol.TBinaryProtocol;
this.offline_queue = [];
this.connected = false;
this.connection.addListener("connect", function() {
self.connected = true;
this.setTimeout(self.options.timeout || 0);
this.setNoDelay();
this.frameLeft = 0;
this.framePos = 0;
this.frame = null;
self.offline_queue.forEach(function(data) {
self.connection.write(data);
});
self.emit("connect");
});
this.connection.addListener("error", function(err) {
// Only emit the error if no-one else is listening on the connection
// or if someone is listening on us
if (self.connection.listeners('error').length === 1
|| self.listeners('error').length > 0) {
self.emit("error", err)
}
});
// Add a close listener
this.connection.addListener("close", function() {
self.emit("close");
});
this.connection.addListener("timeout", function() {
self.emit("timeout");
});
this.connection.addListener("data", self.transport.receiver(function(transport_with_data) {
var message = new self.protocol(transport_with_data);
try {
var header = message.readMessageBegin();
var dummy_seqid = header.rseqid * -1;
var client = self.client;
client._reqs[dummy_seqid] = function(err, success){
transport_with_data.commitPosition();
var callback = client._reqs[header.rseqid];
delete client._reqs[header.rseqid];
if (callback) {
callback(err, success);
}
};
client['recv_' + header.fname](message, header.mtype, dummy_seqid);
}
catch (e) {
if (e instanceof ttransport.InputBufferUnderrunError) {
transport_with_data.rollbackPosition();
}
else {
throw e;
}
}
}));
};
util.inherits(Connection, EventEmitter);
Connection.prototype.end = function() {
this.connection.end();
}
Connection.prototype.write = function(data) {
if (!this.connected) {
this.offline_queue.push(data);
return;
}
this.connection.write(data);
}
exports.createConnection = function(host, port, options) {
var stream = net.createConnection(port, host);
var connection = new Connection(stream, options);
connection.host = host;
connection.port = port;
return connection;
}
exports.createClient = function(cls, connection) {
if (cls.Client) {
cls = cls.Client;
}
var client = new cls(new connection.transport(undefined, function(buf) {
connection.write(buf);
}), connection.protocol);
// TODO clean this up
connection.client = client;
return client;
}
var child_process = require('child_process');
var StdIOConnection = exports.StdIOConnection = function(command, options) {
var command_parts = command.split(' ');
command = command_parts[0];
var args = command_parts.splice(1,command_parts.length -1);
var child = this.child = child_process.spawn(command,args);
var self = this;
EventEmitter.call(this);
this._debug = options.debug || false;
this.connection = child.stdin;
this.options = options || {};
this.transport = this.options.transport || ttransport.TFramedTransport;
this.protocol = this.options.protocol || tprotocol.TBinaryProtocol;
this.offline_queue = [];
if(this._debug === true){
this.child.stderr.on('data',function(err){
console.log(err.toString(),'CHILD ERROR');
});
this.child.on('exit',function(code,signal){
console.log(code+':'+signal,'CHILD EXITED');
});
}
this.frameLeft = 0;
this.framePos = 0;
this.frame = null;
this.connected = true;
self.offline_queue.forEach(function(data) {
self.connection.write(data);
});
this.connection.addListener("error", function(err) {
self.emit("error", err);
});
// Add a close listener
this.connection.addListener("close", function() {
self.emit("close");
});
child.stdout.addListener("data", self.transport.receiver(function(transport_with_data) {
var message = new self.protocol(transport_with_data);
try {
var header = message.readMessageBegin();
var dummy_seqid = header.rseqid * -1;
var client = self.client;
client._reqs[dummy_seqid] = function(err, success){
transport_with_data.commitPosition();
var callback = client._reqs[header.rseqid];
delete client._reqs[header.rseqid];
if (callback) {
callback(err, success);
}
};
client['recv_' + header.fname](message, header.mtype, dummy_seqid);
}
catch (e) {
if (e instanceof ttransport.InputBufferUnderrunError) {
transport_with_data.rollbackPosition();
}
else {
throw e;
}
}
}));
};
util.inherits(StdIOConnection, EventEmitter);
StdIOConnection.prototype.end = function() {
this.connection.end();
}
StdIOConnection.prototype.write = function(data) {
if (!this.connected) {
this.offline_queue.push(data);
return;
}
this.connection.write(data);
}
exports.createStdIOConnection = function(command,options){
return new StdIOConnection(command,options);
};
exports.createStdIOClient = function(cls,connection) {
if (cls.Client) {
cls = cls.Client;
}
var client = new cls(new connection.transport(undefined, function(buf) {
connection.write(buf);
}), connection.protocol);
// TODO clean this up
connection.client = client;
return client;
}