-- Protocol and transport factories now wrap around a single protocol/transport
Summary:
- This is an analagous to the C++ change made in r31441
Reviewed By: slee
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664978 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/protocol/TBinaryProtocol.java b/lib/java/src/protocol/TBinaryProtocol.java
index b6431bf..08f6e55 100644
--- a/lib/java/src/protocol/TBinaryProtocol.java
+++ b/lib/java/src/protocol/TBinaryProtocol.java
@@ -14,18 +14,16 @@
* Factory
*/
public static class Factory implements TProtocolFactory {
- public TProtocol[] getIOProtocols(TTransport in, TTransport out) {
- TProtocol[] io = new TProtocol[2];
- io[0] = io[1] = new TBinaryProtocol(in, out);
- return io;
+ public TProtocol getProtocol(TTransport trans) {
+ return new TBinaryProtocol(trans);
}
}
/**
* Constructor
*/
- public TBinaryProtocol(TTransport in, TTransport out) {
- super(in, out);
+ public TBinaryProtocol(TTransport trans) {
+ super(trans);
}
public void writeMessageBegin(TMessage message) throws TException {
@@ -80,14 +78,14 @@
private byte [] bout = new byte[1];
public void writeByte(byte b) throws TException {
bout[0] = b;
- outputTransport_.write(bout, 0, 1);
+ trans_.write(bout, 0, 1);
}
private byte[] i16out = new byte[2];
public void writeI16(short i16) throws TException {
i16out[0] = (byte)(0xff & (i16 >> 8));
i16out[1] = (byte)(0xff & (i16));
- outputTransport_.write(i16out, 0, 2);
+ trans_.write(i16out, 0, 2);
}
private byte[] i32out = new byte[4];
@@ -96,7 +94,7 @@
i32out[1] = (byte)(0xff & (i32 >> 16));
i32out[2] = (byte)(0xff & (i32 >> 8));
i32out[3] = (byte)(0xff & (i32));
- outputTransport_.write(i32out, 0, 4);
+ trans_.write(i32out, 0, 4);
}
private byte[] i64out = new byte[8];
@@ -109,7 +107,7 @@
i64out[5] = (byte)(0xff & (i64 >> 16));
i64out[6] = (byte)(0xff & (i64 >> 8));
i64out[7] = (byte)(0xff & (i64));
- outputTransport_.write(i64out, 0, 8);
+ trans_.write(i64out, 0, 8);
}
public void writeDouble(double dub) throws TException {
@@ -119,7 +117,7 @@
public void writeString(String str) throws TException {
byte[] dat = str.getBytes();
writeI32(dat.length);
- outputTransport_.write(dat, 0, dat.length);
+ trans_.write(dat, 0, dat.length);
}
/**
@@ -187,13 +185,13 @@
private byte[] bin = new byte[1];
public byte readByte() throws TException {
- inputTransport_.readAll(bin, 0, 1);
+ trans_.readAll(bin, 0, 1);
return bin[0];
}
private byte[] i16rd = new byte[2];
public short readI16() throws TException {
- inputTransport_.readAll(i16rd, 0, 2);
+ trans_.readAll(i16rd, 0, 2);
return
(short)
(((i16rd[0] & 0xff) << 8) |
@@ -202,7 +200,7 @@
private byte[] i32rd = new byte[4];
public int readI32() throws TException {
- inputTransport_.readAll(i32rd, 0, 4);
+ trans_.readAll(i32rd, 0, 4);
return
((i32rd[0] & 0xff) << 24) |
((i32rd[1] & 0xff) << 16) |
@@ -212,7 +210,7 @@
private byte[] i64rd = new byte[8];
public long readI64() throws TException {
- inputTransport_.readAll(i64rd, 0, 8);
+ trans_.readAll(i64rd, 0, 8);
return
((long)(i64rd[0] & 0xff) << 56) |
((long)(i64rd[1] & 0xff) << 48) |
@@ -231,7 +229,7 @@
public String readString() throws TException {
int size = readI32();
byte[] buf = new byte[size];
- inputTransport_.readAll(buf, 0, size);
+ trans_.readAll(buf, 0, size);
return new String(buf);
}
}
diff --git a/lib/java/src/protocol/TProtocol.java b/lib/java/src/protocol/TProtocol.java
index 43b4f07..502f473 100644
--- a/lib/java/src/protocol/TProtocol.java
+++ b/lib/java/src/protocol/TProtocol.java
@@ -16,35 +16,22 @@
private TProtocol() {}
/**
- * Input transport
+ * Transport
*/
- protected TTransport inputTransport_;
-
- /**
- * Output transport
- */
- protected TTransport outputTransport_;
+ protected TTransport trans_;
/**
* Constructor
*/
- protected TProtocol(TTransport in, TTransport out) {
- inputTransport_ = in;
- outputTransport_ = out;
+ protected TProtocol(TTransport trans) {
+ trans_ = trans;
}
/**
- * Input accessor
+ * Transport accessor
*/
- public TTransport getInputTransport() {
- return inputTransport_;
- }
-
- /**
- * Output accessor
- */
- public TTransport getOutputTransport() {
- return outputTransport_;
+ public TTransport getTransport() {
+ return trans_;
}
/**
diff --git a/lib/java/src/protocol/TProtocolFactory.java b/lib/java/src/protocol/TProtocolFactory.java
index 7604b12..9ca3c9e 100644
--- a/lib/java/src/protocol/TProtocolFactory.java
+++ b/lib/java/src/protocol/TProtocolFactory.java
@@ -3,11 +3,11 @@
import com.facebook.thrift.transport.TTransport;
/**
- * Factory interface for constructing protocol encoder/decoder pair from an
- * input and output transport.
+ * Factory interface for constructing protocol instances.
*
* @author Mark Slee <mcslee@facebook.com>
+ * @author Aditya Agarwal <aditya@facebook.com>
*/
public interface TProtocolFactory {
- public TProtocol[] getIOProtocols(TTransport in, TTransport out);
+ public TProtocol getProtocol(TTransport trans);
}
diff --git a/lib/java/src/server/TServer.java b/lib/java/src/server/TServer.java
index 5ef96d0..8e21e11 100644
--- a/lib/java/src/server/TServer.java
+++ b/lib/java/src/server/TServer.java
@@ -24,14 +24,24 @@
protected TServerTransport serverTransport_;
/**
- * Transport Factory
+ * Input Transport Factory
*/
- protected TTransportFactory transportFactory_;
+ protected TTransportFactory inputTransportFactory_;
/**
- * Protocol Factory
+ * Output Transport Factory
*/
- protected TProtocolFactory protocolFactory_;
+ protected TTransportFactory outputTransportFactory_;
+
+ /**
+ * Input Protocol Factory
+ */
+ protected TProtocolFactory inputProtocolFactory_;
+
+ /**
+ * Output Protocol Factory
+ */
+ protected TProtocolFactory outputProtocolFactory_;
/**
* Default constructors.
@@ -40,8 +50,10 @@
protected TServer(TProcessor processor,
TServerTransport serverTransport) {
this(processor,
- serverTransport,
+ serverTransport,
new TTransportFactory(),
+ new TTransportFactory(),
+ new TBinaryProtocol.Factory(),
new TBinaryProtocol.Factory());
}
@@ -51,6 +63,8 @@
this(processor,
serverTransport,
transportFactory,
+ transportFactory,
+ new TBinaryProtocol.Factory(),
new TBinaryProtocol.Factory());
}
@@ -58,10 +72,26 @@
TServerTransport serverTransport,
TTransportFactory transportFactory,
TProtocolFactory protocolFactory) {
+ this(processor,
+ serverTransport,
+ transportFactory,
+ transportFactory,
+ protocolFactory,
+ protocolFactory);
+ }
+
+ protected TServer(TProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory inputTransportFactory,
+ TTransportFactory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory) {
processor_ = processor;
serverTransport_ = serverTransport;
- transportFactory_ = transportFactory;
- protocolFactory_ = protocolFactory;
+ inputTransportFactory_ = inputTransportFactory;
+ outputTransportFactory_ = outputTransportFactory;
+ inputProtocolFactory_ = inputProtocolFactory;
+ outputProtocolFactory_ = outputProtocolFactory;
}
/**
diff --git a/lib/java/src/server/TSimpleServer.java b/lib/java/src/server/TSimpleServer.java
index 7ecd347..7058295 100644
--- a/lib/java/src/server/TSimpleServer.java
+++ b/lib/java/src/server/TSimpleServer.java
@@ -6,6 +6,7 @@
import com.facebook.thrift.protocol.TProtocolFactory;
import com.facebook.thrift.transport.TServerTransport;
import com.facebook.thrift.transport.TTransport;
+import com.facebook.thrift.transport.TTransportFactory;
import com.facebook.thrift.transport.TTransportException;
/**
@@ -20,6 +21,24 @@
super(processor, serverTransport);
}
+ public TSimpleServer(TProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory) {
+ super(processor, serverTransport, transportFactory, protocolFactory);
+ }
+
+ public TSimpleServer(TProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory inputTransportFactory,
+ TTransportFactory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory) {
+ super(processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory);
+ }
+
public void serve() {
try {
serverTransport_.listen();
@@ -30,14 +49,18 @@
while (true) {
TTransport client = null;
- TTransport[] iot = null;
- TProtocol[] iop = null;
+ TTransport inputTransport = null;
+ TTransport outputTransport = null;
+ TProtocol inputProtocol = null;
+ TProtocol outputProtocol = null;
try {
client = serverTransport_.accept();
if (client != null) {
- iot = transportFactory_.getIOTransports(client);
- iop = protocolFactory_.getIOProtocols(iot[0], iot[1]);
- while (processor_.process(iop[0], iop[1]));
+ inputTransport = inputTransportFactory_.getTransport(client);
+ outputTransport = outputTransportFactory_.getTransport(client);
+ inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+ while (processor_.process(inputProtocol, outputProtocol)) {}
}
} catch (TTransportException ttx) {
// Client died, just move on
@@ -47,14 +70,14 @@
x.printStackTrace();
}
- if (iot != null) {
- if (iot[0] != null) {
- iot[0].close();
- }
- if (iot[1] != null) {
- iot[1].close();
- }
+ if (inputTransport != null) {
+ inputTransport.close();
}
+
+ if (outputTransport != null) {
+ outputTransport.close();
+ }
+
}
}
}
diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java
index 090859d..560a3cc 100644
--- a/lib/java/src/server/TThreadPoolServer.java
+++ b/lib/java/src/server/TThreadPoolServer.java
@@ -4,6 +4,7 @@
import com.facebook.thrift.TProcessor;
import com.facebook.thrift.protocol.TProtocol;
import com.facebook.thrift.protocol.TProtocolFactory;
+import com.facebook.thrift.protocol.TBinaryProtocol;
import com.facebook.thrift.transport.TServerTransport;
import com.facebook.thrift.transport.TTransport;
import com.facebook.thrift.transport.TTransportException;
@@ -34,13 +35,32 @@
public TThreadPoolServer(TProcessor processor,
TServerTransport serverTransport) {
- this(processor, serverTransport, new Options());
+ this(processor, serverTransport,
+ new TTransportFactory(), new TTransportFactory(),
+ new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+ new Options());
+ }
+
+ public TThreadPoolServer(TProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory) {
+ this(processor, serverTransport,
+ transportFactory, transportFactory,
+ protocolFactory, protocolFactory,
+ new Options());
}
public TThreadPoolServer(TProcessor processor,
TServerTransport serverTransport,
+ TTransportFactory inputTransportFactory,
+ TTransportFactory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory,
Options options) {
- super(processor, serverTransport);
+ super(processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory);
executorService_ = null;
@@ -96,12 +116,16 @@
* Loops on processing a client forever
*/
public void run() {
- TTransport[] iot = null;
- TProtocol[] iop = null;
+ TTransport inputTransport = null;
+ TTransport outputTransport = null;
+ TProtocol inputProtocol = null;
+ TProtocol outputProtocol = null;
try {
- iot = transportFactory_.getIOTransports(client_);
- iop = protocolFactory_.getIOProtocols(iot[0], iot[1]);
- while (processor_.process(iop[0], iop[1])) {}
+ inputTransport = inputTransportFactory_.getTransport(client_);
+ outputTransport = outputTransportFactory_.getTransport(client_);
+ inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+ while (processor_.process(inputProtocol, outputProtocol)) {}
} catch (TTransportException ttx) {
// Assume the client died and continue silently
} catch (TException tx) {
@@ -110,13 +134,12 @@
x.printStackTrace();
}
- if (iot != null) {
- if (iot[0] != null) {
- iot[0].close();
- }
- if (iot[1] != null) {
- iot[1].close();
- }
+ if (inputTransport != null) {
+ inputTransport.close();
+ }
+
+ if (outputTransport != null) {
+ outputTransport.close();
}
}
}
diff --git a/lib/java/src/transport/TTransportFactory.java b/lib/java/src/transport/TTransportFactory.java
index 4ba2c28..5654b1f 100644
--- a/lib/java/src/transport/TTransportFactory.java
+++ b/lib/java/src/transport/TTransportFactory.java
@@ -1,25 +1,24 @@
package com.facebook.thrift.transport;
/**
- * Factory class used to create an input and output transport out of a simple
- * transport. This is used primarily in servers, which get Transports from
- * a ServerTransport and then may want to mutate them.
+ * Factory class used to create wrapped instance of Transports.
+ * This is used primarily in servers, which get Transports from
+ * a ServerTransport and then may want to mutate them (i.e. create
+ * a BufferedTransport from the underlying base transport)
*
* @author Mark Slee <mcslee@facebook.com>
+ * @author Aditya Agarwal <aditya@facebook.com>
*/
public class TTransportFactory {
/**
- * Returns a list of two transports (input, output) from a simple
- * Transport.
+ * Return a wrapped instance of the base Transport.
*
* @param in The base transport
- * @returns Array of two transports, first for input, second for output
+ * @returns Wrapped Transport
*/
- public TTransport[] getIOTransports(TTransport in) {
- TTransport[] out = new TTransport[2];
- out[0] = out[1] = in;
- return out;
+ public TTransport getTransport(TTransport trans) {
+ return trans;
}
}