New protocol wrapping transport model for Thrift Java
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664846 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/TProcessor.java b/lib/java/src/TProcessor.java
index dd6ae85..08817ee 100644
--- a/lib/java/src/TProcessor.java
+++ b/lib/java/src/TProcessor.java
@@ -1,6 +1,6 @@
package com.facebook.thrift;
-import com.facebook.thrift.transport.TTransport;
+import com.facebook.thrift.protocol.TProtocol;
/**
* A processor is a generic object which operates upon an input stream and
@@ -9,6 +9,6 @@
* @author Mark Slee <mcslee@facebook.com>
*/
public interface TProcessor {
- public boolean process(TTransport in, TTransport out)
+ public boolean process(TProtocol in, TProtocol out)
throws TException;
}
diff --git a/lib/java/src/protocol/TBinaryProtocol.java b/lib/java/src/protocol/TBinaryProtocol.java
index 61bac08..b6431bf 100644
--- a/lib/java/src/protocol/TBinaryProtocol.java
+++ b/lib/java/src/protocol/TBinaryProtocol.java
@@ -8,82 +8,99 @@
*
* @author Mark Slee <mcslee@facebook.com>
*/
-public class TBinaryProtocol implements TProtocol {
+public class TBinaryProtocol extends TProtocol {
- public void writeMessageBegin(TTransport out, TMessage message) throws TException {
- writeString(out, message.name);
- writeByte(out, message.type);
- writeI32(out, message.seqid);
+ /**
+ * Factory
+ */
+ public static class Factory implements TProtocolFactory {
+ public TProtocol[] getIOProtocols(TTransport in, TTransport out) {
+ TProtocol[] io = new TProtocol[2];
+ io[0] = io[1] = new TBinaryProtocol(in, out);
+ return io;
+ }
}
- public void writeMessageEnd(TTransport out) throws TException {}
-
-
- public void writeStructBegin(TTransport out, TStruct struct) throws TException {}
-
- public void writeStructEnd(TTransport out) throws TException {}
-
- public void writeFieldBegin(TTransport out, TField field) throws TException {
- writeByte(out, field.type);
- writeI16(out, field.id);
+ /**
+ * Constructor
+ */
+ public TBinaryProtocol(TTransport in, TTransport out) {
+ super(in, out);
}
- public void writeFieldEnd(TTransport out) throws TException {}
-
- public void writeFieldStop(TTransport out) throws TException {
- writeByte(out, TType.STOP);
+ public void writeMessageBegin(TMessage message) throws TException {
+ writeString(message.name);
+ writeByte(message.type);
+ writeI32(message.seqid);
}
- public void writeMapBegin(TTransport out, TMap map) throws TException {
- writeByte(out, map.keyType);
- writeByte(out, map.valueType);
- writeI32(out, map.size);
+ public void writeMessageEnd() {}
+
+ public void writeStructBegin(TStruct struct) {}
+
+ public void writeStructEnd() {}
+
+ public void writeFieldBegin(TField field) throws TException {
+ writeByte(field.type);
+ writeI16(field.id);
}
- public void writeMapEnd(TTransport out) throws TException {}
+ public void writeFieldEnd() {}
- public void writeListBegin(TTransport out, TList list) throws TException {
- writeByte(out, list.elemType);
- writeI32(out, list.size);
+ public void writeFieldStop() throws TException {
+ writeByte(TType.STOP);
}
- public void writeListEnd(TTransport out) throws TException {}
-
- public void writeSetBegin(TTransport out, TSet set) throws TException {
- writeByte(out, set.elemType);
- writeI32(out, set.size);
+ public void writeMapBegin(TMap map) throws TException {
+ writeByte(map.keyType);
+ writeByte(map.valueType);
+ writeI32(map.size);
}
- public void writeSetEnd(TTransport out) throws TException {}
+ public void writeMapEnd() {}
- public void writeBool(TTransport out, boolean b) throws TException {
- writeByte(out, b ? (byte)1 : (byte)0);
+ public void writeListBegin(TList list) throws TException {
+ writeByte(list.elemType);
+ writeI32(list.size);
}
- public void writeByte(TTransport out, byte b) throws TException {
- byte[] bout = new byte[1];
+ public void writeListEnd() {}
+
+ public void writeSetBegin(TSet set) throws TException {
+ writeByte(set.elemType);
+ writeI32(set.size);
+ }
+
+ public void writeSetEnd() {}
+
+ public void writeBool(boolean b) throws TException {
+ writeByte(b ? (byte)1 : (byte)0);
+ }
+
+ private byte [] bout = new byte[1];
+ public void writeByte(byte b) throws TException {
bout[0] = b;
- out.write(bout, 0, 1);
+ outputTransport_.write(bout, 0, 1);
}
- public void writeI16(TTransport out, short i16) throws TException {
- byte[] i16out = new byte[2];
+ private byte[] i16out = new byte[2];
+ public void writeI16(short i16) throws TException {
i16out[0] = (byte)(0xff & (i16 >> 8));
i16out[1] = (byte)(0xff & (i16));
- out.write(i16out, 0, 2);
+ outputTransport_.write(i16out, 0, 2);
}
- public void writeI32(TTransport out, int i32) throws TException {
- byte[] i32out = new byte[4];
+ private byte[] i32out = new byte[4];
+ public void writeI32(int i32) throws TException {
i32out[0] = (byte)(0xff & (i32 >> 24));
i32out[1] = (byte)(0xff & (i32 >> 16));
i32out[2] = (byte)(0xff & (i32 >> 8));
i32out[3] = (byte)(0xff & (i32));
- out.write(i32out, 0, 4);
+ outputTransport_.write(i32out, 0, 4);
}
- public void writeI64(TTransport out, long i64) throws TException {
- byte[] i64out = new byte[8];
+ private byte[] i64out = new byte[8];
+ public void writeI64(long i64) throws TException {
i64out[0] = (byte)(0xff & (i64 >> 56));
i64out[1] = (byte)(0xff & (i64 >> 48));
i64out[2] = (byte)(0xff & (i64 >> 40));
@@ -92,100 +109,100 @@
i64out[5] = (byte)(0xff & (i64 >> 16));
i64out[6] = (byte)(0xff & (i64 >> 8));
i64out[7] = (byte)(0xff & (i64));
- out.write(i64out, 0, 8);
+ outputTransport_.write(i64out, 0, 8);
}
- public void writeDouble(TTransport out, double dub) throws TException {
- writeI64(out, Double.doubleToLongBits(dub));
+ public void writeDouble(double dub) throws TException {
+ writeI64(Double.doubleToLongBits(dub));
}
- public void writeString(TTransport out, String str) throws TException {
+ public void writeString(String str) throws TException {
byte[] dat = str.getBytes();
- writeI32(out, dat.length);
- out.write(dat, 0, dat.length);
+ writeI32(dat.length);
+ outputTransport_.write(dat, 0, dat.length);
}
/**
* Reading methods.
*/
- public TMessage readMessageBegin(TTransport in) throws TException {
+ public TMessage readMessageBegin() throws TException {
TMessage message = new TMessage();
- message.name = readString(in);
- message.type = readByte(in);
- message.seqid = readI32(in);
+ message.name = readString();
+ message.type = readByte();
+ message.seqid = readI32();
return message;
}
- public void readMessageEnd(TTransport in) throws TException {}
+ public void readMessageEnd() {}
- public TStruct readStructBegin(TTransport in) throws TException {
+ public TStruct readStructBegin() {
return new TStruct();
}
- public void readStructEnd(TTransport in) throws TException {}
+ public void readStructEnd() {}
- public TField readFieldBegin(TTransport in) throws TException {
+ public TField readFieldBegin() throws TException {
TField field = new TField();
- field.type = readByte(in);
+ field.type = readByte();
if (field.type != TType.STOP) {
- field.id = readI16(in);
+ field.id = readI16();
}
return field;
}
- public void readFieldEnd(TTransport in) throws TException {}
+ public void readFieldEnd() {}
- public TMap readMapBegin(TTransport in) throws TException {
+ public TMap readMapBegin() throws TException {
TMap map = new TMap();
- map.keyType = readByte(in);
- map.valueType = readByte(in);
- map.size = readI32(in);
+ map.keyType = readByte();
+ map.valueType = readByte();
+ map.size = readI32();
return map;
}
- public void readMapEnd(TTransport in) throws TException {}
+ public void readMapEnd() {}
- public TList readListBegin(TTransport in) throws TException {
+ public TList readListBegin() throws TException {
TList list = new TList();
- list.elemType = readByte(in);
- list.size = readI32(in);
+ list.elemType = readByte();
+ list.size = readI32();
return list;
}
- public void readListEnd(TTransport in) throws TException {}
+ public void readListEnd() {}
- public TSet readSetBegin(TTransport in) throws TException {
+ public TSet readSetBegin() throws TException {
TSet set = new TSet();
- set.elemType = readByte(in);
- set.size = readI32(in);
+ set.elemType = readByte();
+ set.size = readI32();
return set;
}
- public void readSetEnd(TTransport in) throws TException {}
+ public void readSetEnd() {}
- public boolean readBool(TTransport in) throws TException {
- return (readByte(in) == 1);
+ public boolean readBool() throws TException {
+ return (readByte() == 1);
}
- public byte readByte(TTransport in) throws TException {
- byte[] bin = new byte[1];
- in.readAll(bin, 0, 1);
+ private byte[] bin = new byte[1];
+ public byte readByte() throws TException {
+ inputTransport_.readAll(bin, 0, 1);
return bin[0];
}
- public short readI16(TTransport in) throws TException {
- byte[] i16rd = new byte[2];
- in.readAll(i16rd, 0, 2);
+ private byte[] i16rd = new byte[2];
+ public short readI16() throws TException {
+ inputTransport_.readAll(i16rd, 0, 2);
return
(short)
(((i16rd[0] & 0xff) << 8) |
((i16rd[1] & 0xff)));
}
- public int readI32(TTransport in) throws TException {
- byte[] i32rd = new byte[4];
- in.readAll(i32rd, 0, 4);
+ private byte[] i32rd = new byte[4];
+ public int readI32() throws TException {
+ inputTransport_.readAll(i32rd, 0, 4);
return
((i32rd[0] & 0xff) << 24) |
((i32rd[1] & 0xff) << 16) |
@@ -193,9 +210,9 @@
((i32rd[3] & 0xff));
}
- public long readI64(TTransport in) throws TException {
- byte[] i64rd = new byte[8];
- in.readAll(i64rd, 0, 8);
+ private byte[] i64rd = new byte[8];
+ public long readI64() throws TException {
+ inputTransport_.readAll(i64rd, 0, 8);
return
((long)(i64rd[0] & 0xff) << 56) |
((long)(i64rd[1] & 0xff) << 48) |
@@ -207,14 +224,14 @@
((long)(i64rd[7] & 0xff));
}
- public double readDouble(TTransport in) throws TException {
- return Double.longBitsToDouble(readI64(in));
+ public double readDouble() throws TException {
+ return Double.longBitsToDouble(readI64());
}
- public String readString(TTransport in) throws TException {
- int size = readI32(in);
+ public String readString() throws TException {
+ int size = readI32();
byte[] buf = new byte[size];
- in.readAll(buf, 0, size);
+ inputTransport_.readAll(buf, 0, size);
return new String(buf);
}
}
diff --git a/lib/java/src/protocol/TProtocol.java b/lib/java/src/protocol/TProtocol.java
index 0831d12..43b4f07 100644
--- a/lib/java/src/protocol/TProtocol.java
+++ b/lib/java/src/protocol/TProtocol.java
@@ -8,106 +8,129 @@
*
* @author Mark Slee <mcslee@facebook.com>
*/
-public interface TProtocol {
+public abstract class TProtocol {
+
+ /**
+ * Prevent direct instantiation
+ */
+ private TProtocol() {}
+
+ /**
+ * Input transport
+ */
+ protected TTransport inputTransport_;
+
+ /**
+ * Output transport
+ */
+ protected TTransport outputTransport_;
+
+ /**
+ * Constructor
+ */
+ protected TProtocol(TTransport in, TTransport out) {
+ inputTransport_ = in;
+ outputTransport_ = out;
+ }
+
+ /**
+ * Input accessor
+ */
+ public TTransport getInputTransport() {
+ return inputTransport_;
+ }
+
+ /**
+ * Output accessor
+ */
+ public TTransport getOutputTransport() {
+ return outputTransport_;
+ }
/**
* Writing methods.
*/
- public void writeMessageBegin(TTransport out,
- TMessage message) throws TException;
+ public abstract void writeMessageBegin(TMessage message) throws TException;
- public void writeMessageEnd (TTransport out) throws TException;
+ public abstract void writeMessageEnd() throws TException;
- public void writeStructBegin (TTransport out,
- TStruct struct) throws TException;
+ public abstract void writeStructBegin(TStruct struct) throws TException;
- public void writeStructEnd (TTransport out) throws TException;
+ public abstract void writeStructEnd() throws TException;
- public void writeFieldBegin (TTransport out,
- TField field) throws TException;
+ public abstract void writeFieldBegin(TField field) throws TException;
- public void writeFieldEnd (TTransport out) throws TException;
+ public abstract void writeFieldEnd() throws TException;
- public void writeFieldStop (TTransport out) throws TException;
+ public abstract void writeFieldStop() throws TException;
- public void writeMapBegin (TTransport out,
- TMap map) throws TException;
+ public abstract void writeMapBegin(TMap map) throws TException;
- public void writeMapEnd (TTransport out) throws TException;
+ public abstract void writeMapEnd() throws TException;
- public void writeListBegin (TTransport out,
- TList list) throws TException;
+ public abstract void writeListBegin(TList list) throws TException;
- public void writeListEnd (TTransport out) throws TException;
+ public abstract void writeListEnd() throws TException;
- public void writeSetBegin (TTransport out,
- TSet set) throws TException;
+ public abstract void writeSetBegin(TSet set) throws TException;
- public void writeSetEnd (TTransport out) throws TException;
+ public abstract void writeSetEnd() throws TException;
- public void writeBool (TTransport out,
- boolean b) throws TException;
+ public abstract void writeBool(boolean b) throws TException;
- public void writeByte (TTransport out,
- byte b) throws TException;
+ public abstract void writeByte(byte b) throws TException;
- public void writeI16 (TTransport out,
- short i16) throws TException;
+ public abstract void writeI16(short i16) throws TException;
- public void writeI32 (TTransport out,
- int i32) throws TException;
+ public abstract void writeI32(int i32) throws TException;
- public void writeI64 (TTransport out,
- long i64) throws TException;
+ public abstract void writeI64(long i64) throws TException;
- public void writeDouble (TTransport out,
- double dub) throws TException;
+ public abstract void writeDouble(double dub) throws TException;
-
- public void writeString (TTransport out,
- String str) throws TException;
+ public abstract void writeString(String str) throws TException;
/**
* Reading methods.
*/
- public TMessage readMessageBegin (TTransport in) throws TException;
+ public abstract TMessage readMessageBegin() throws TException;
- public void readMessageEnd (TTransport in) throws TException;
+ public abstract void readMessageEnd() throws TException;
- public TStruct readStructBegin (TTransport in) throws TException;
+ public abstract TStruct readStructBegin() throws TException;
- public void readStructEnd (TTransport in) throws TException;
+ public abstract void readStructEnd() throws TException;
- public TField readFieldBegin (TTransport in) throws TException;
+ public abstract TField readFieldBegin() throws TException;
- public void readFieldEnd (TTransport in) throws TException;
+ public abstract void readFieldEnd() throws TException;
- public TMap readMapBegin (TTransport in) throws TException;
+ public abstract TMap readMapBegin() throws TException;
- public void readMapEnd (TTransport in) throws TException;
+ public abstract void readMapEnd() throws TException;
- public TList readListBegin (TTransport in) throws TException;
+ public abstract TList readListBegin() throws TException;
- public void readListEnd (TTransport in) throws TException;
+ public abstract void readListEnd() throws TException;
- public TSet readSetBegin (TTransport in) throws TException;
+ public abstract TSet readSetBegin() throws TException;
- public void readSetEnd (TTransport in) throws TException;
+ public abstract void readSetEnd() throws TException;
- public boolean readBool (TTransport in) throws TException;
+ public abstract boolean readBool() throws TException;
- public byte readByte (TTransport in) throws TException;
+ public abstract byte readByte() throws TException;
- public short readI16 (TTransport in) throws TException;
+ public abstract short readI16() throws TException;
- public int readI32 (TTransport in) throws TException;
+ public abstract int readI32() throws TException;
- public long readI64 (TTransport in) throws TException;
+ public abstract long readI64() throws TException;
- public double readDouble (TTransport in) throws TException;
+ public abstract double readDouble() throws TException;
- public String readString (TTransport in) throws TException;
+ public abstract String readString() throws TException;
}
diff --git a/lib/java/src/protocol/TProtocolFactory.java b/lib/java/src/protocol/TProtocolFactory.java
new file mode 100644
index 0000000..7604b12
--- /dev/null
+++ b/lib/java/src/protocol/TProtocolFactory.java
@@ -0,0 +1,13 @@
+package com.facebook.thrift.protocol;
+
+import com.facebook.thrift.transport.TTransport;
+
+/**
+ * Factory interface for constructing protocol encoder/decoder pair from an
+ * input and output transport.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+public interface TProtocolFactory {
+ public TProtocol[] getIOProtocols(TTransport in, TTransport out);
+}
diff --git a/lib/java/src/protocol/TProtocolUtil.java b/lib/java/src/protocol/TProtocolUtil.java
index 1c88f8a..c83950d 100644
--- a/lib/java/src/protocol/TProtocolUtil.java
+++ b/lib/java/src/protocol/TProtocolUtil.java
@@ -10,75 +10,75 @@
* @author Mark Slee <mcslee@facebook.com>
*/
public class TProtocolUtil {
- public static void skip(TProtocol prot, TTransport in, byte type)
+ public static void skip(TProtocol prot, byte type)
throws TException {
switch (type) {
case TType.BOOL:
{
- prot.readBool(in);
+ prot.readBool();
}
case TType.BYTE:
{
- prot.readByte(in);
+ prot.readByte();
}
case TType.I16:
{
- prot.readI16(in);
+ prot.readI16();
}
case TType.I32:
{
- prot.readI32(in);
+ prot.readI32();
}
case TType.I64:
{
- prot.readI64(in);
+ prot.readI64();
}
case TType.DOUBLE:
{
- prot.readDouble(in);
+ prot.readDouble();
}
case TType.STRING:
{
- prot.readString(in);
+ prot.readString();
}
case TType.STRUCT:
{
- prot.readStructBegin(in);
+ prot.readStructBegin();
while (true) {
- TField field = prot.readFieldBegin(in);
+ TField field = prot.readFieldBegin();
if (field.type == TType.STOP) {
break;
}
- skip(prot, in, field.type);
- prot.readFieldEnd(in);
+ skip(prot, field.type);
+ prot.readFieldEnd();
}
- prot.readStructEnd(in);
+ prot.readStructEnd();
}
case TType.MAP:
{
- TMap map = prot.readMapBegin(in);
+ TMap map = prot.readMapBegin();
for (int i = 0; i < map.size; i++) {
- skip(prot, in, map.keyType);
- skip(prot, in, map.valueType);
+ skip(prot, map.keyType);
+ skip(prot, map.valueType);
}
- prot.readMapEnd(in);
+ prot.readMapEnd();
}
case TType.SET:
{
- TSet set = prot.readSetBegin(in);
+ TSet set = prot.readSetBegin();
for (int i = 0; i < set.size; i++) {
- skip(prot, in, set.elemType);
+ skip(prot, set.elemType);
}
- prot.readSetEnd(in);
+ prot.readSetEnd();
}
case TType.LIST:
{
- TList list = prot.readListBegin(in);
+ TList list = prot.readListBegin();
for (int i = 0; i < list.size; i++) {
- skip(prot, in, list.elemType);
+ skip(prot, list.elemType);
}
- prot.readListEnd(in);
+ prot.readListEnd();
}
default:
return;
diff --git a/lib/java/src/server/TServer.java b/lib/java/src/server/TServer.java
index 3cae00b..5ef96d0 100644
--- a/lib/java/src/server/TServer.java
+++ b/lib/java/src/server/TServer.java
@@ -1,9 +1,10 @@
package com.facebook.thrift.server;
import com.facebook.thrift.TProcessor;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.protocol.TProtocolFactory;
import com.facebook.thrift.transport.TServerTransport;
import com.facebook.thrift.transport.TTransportFactory;
-import com.facebook.thrift.transport.TBaseTransportFactory;
/**
* Generic interface for a Thrift server.
@@ -13,24 +14,11 @@
public abstract class TServer {
/**
- * The options class should be subclassed by particular servers which have
- * specific options needs, while the general options should live here.
- */
- public static class Options {
- public Options() {}
- }
-
- /**
* Core processor
*/
protected TProcessor processor_;
/**
- * Server options
- */
- protected Options options_;
-
- /**
* Server transport
*/
protected TServerTransport serverTransport_;
@@ -41,6 +29,11 @@
protected TTransportFactory transportFactory_;
/**
+ * Protocol Factory
+ */
+ protected TProtocolFactory protocolFactory_;
+
+ /**
* Default constructors.
*/
@@ -48,8 +41,8 @@
TServerTransport serverTransport) {
this(processor,
serverTransport,
- new TBaseTransportFactory(),
- new Options());
+ new TTransportFactory(),
+ new TBinaryProtocol.Factory());
}
protected TServer(TProcessor processor,
@@ -58,31 +51,22 @@
this(processor,
serverTransport,
transportFactory,
- new Options());
- }
-
-
- protected TServer(TProcessor processor,
- TServerTransport serverTransport,
- Options options) {
- this(processor,
- serverTransport,
- new TBaseTransportFactory(),
- options);
+ new TBinaryProtocol.Factory());
}
protected TServer(TProcessor processor,
TServerTransport serverTransport,
TTransportFactory transportFactory,
- Options options) {
+ TProtocolFactory protocolFactory) {
processor_ = processor;
serverTransport_ = serverTransport;
transportFactory_ = transportFactory;
- options_ = options;
+ protocolFactory_ = protocolFactory;
}
/**
* The run method fires up the server and gets things going.
*/
public abstract void serve();
+
}
diff --git a/lib/java/src/server/TSimpleServer.java b/lib/java/src/server/TSimpleServer.java
index 548ca09..7ecd347 100644
--- a/lib/java/src/server/TSimpleServer.java
+++ b/lib/java/src/server/TSimpleServer.java
@@ -2,6 +2,8 @@
import com.facebook.thrift.TException;
import com.facebook.thrift.TProcessor;
+import com.facebook.thrift.protocol.TProtocol;
+import com.facebook.thrift.protocol.TProtocolFactory;
import com.facebook.thrift.transport.TServerTransport;
import com.facebook.thrift.transport.TTransport;
import com.facebook.thrift.transport.TTransportException;
@@ -28,12 +30,14 @@
while (true) {
TTransport client = null;
- TTransport[] io = null;
+ TTransport[] iot = null;
+ TProtocol[] iop = null;
try {
client = serverTransport_.accept();
if (client != null) {
- io = transportFactory_.getIOTransports(client);
- while (processor_.process(io[0], io[1]));
+ iot = transportFactory_.getIOTransports(client);
+ iop = protocolFactory_.getIOProtocols(iot[0], iot[1]);
+ while (processor_.process(iop[0], iop[1]));
}
} catch (TTransportException ttx) {
// Client died, just move on
@@ -43,12 +47,12 @@
x.printStackTrace();
}
- if (io != null) {
- if (io[0] != null) {
- io[0].close();
+ if (iot != null) {
+ if (iot[0] != null) {
+ iot[0].close();
}
- if (io[1] != null) {
- io[1].close();
+ if (iot[1] != null) {
+ iot[1].close();
}
}
}
diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java
index c63d1e1..090859d 100644
--- a/lib/java/src/server/TThreadPoolServer.java
+++ b/lib/java/src/server/TThreadPoolServer.java
@@ -2,11 +2,12 @@
import com.facebook.thrift.TException;
import com.facebook.thrift.TProcessor;
+import com.facebook.thrift.protocol.TProtocol;
+import com.facebook.thrift.protocol.TProtocolFactory;
import com.facebook.thrift.transport.TServerTransport;
import com.facebook.thrift.transport.TTransport;
import com.facebook.thrift.transport.TTransportException;
import com.facebook.thrift.transport.TTransportFactory;
-import com.facebook.thrift.transport.TBaseTransportFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -26,25 +27,21 @@
private ExecutorService executorService_;
// Customizable server options
- public static class Options extends TServer.Options {
+ public static class Options {
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
}
public TThreadPoolServer(TProcessor processor,
TServerTransport serverTransport) {
- this(processor,
- serverTransport,
- new TBaseTransportFactory(),
- new Options());
+ this(processor, serverTransport, new Options());
}
public TThreadPoolServer(TProcessor processor,
TServerTransport serverTransport,
- TTransportFactory transportFactory,
Options options) {
- super(processor, serverTransport, transportFactory, options);
- serverTransport_ = serverTransport;
+ super(processor, serverTransport);
+
executorService_ = null;
LinkedBlockingQueue<Runnable> executorQueue =
@@ -99,10 +96,12 @@
* Loops on processing a client forever
*/
public void run() {
- TTransport[] io = null;
+ TTransport[] iot = null;
+ TProtocol[] iop = null;
try {
- io = transportFactory_.getIOTransports(client_);
- while (processor_.process(io[0], io[1])) {}
+ iot = transportFactory_.getIOTransports(client_);
+ iop = protocolFactory_.getIOProtocols(iot[0], iot[1]);
+ while (processor_.process(iop[0], iop[1])) {}
} catch (TTransportException ttx) {
// Assume the client died and continue silently
} catch (TException tx) {
@@ -111,12 +110,12 @@
x.printStackTrace();
}
- if (io != null) {
- if (io[0] != null) {
- io[0].close();
+ if (iot != null) {
+ if (iot[0] != null) {
+ iot[0].close();
}
- if (io[1] != null) {
- io[1].close();
+ if (iot[1] != null) {
+ iot[1].close();
}
}
}
diff --git a/lib/java/src/transport/TBaseTransportFactory.java b/lib/java/src/transport/TBaseTransportFactory.java
deleted file mode 100644
index 90bbbe1..0000000
--- a/lib/java/src/transport/TBaseTransportFactory.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.facebook.thrift.transport;
-
-/**
- * Base transport factory just returns the arg transport.
- *
- * @author Mark Slee <mcslee@facebook.com>
- */
-public class TBaseTransportFactory implements TTransportFactory {
-
- /**
- * Returns a list of two transports (input, output) from a simple
- * Transport.
- *
- * @param in The base transport
- * @returns Array of two transports, first for input, second for output
- */
- public TTransport[] getIOTransports(TTransport in) {
- TTransport[] out = new TTransport[2];
- out[0] = out[1] = in;
- return out;
- }
-
-}
diff --git a/lib/java/src/transport/TTransportFactory.java b/lib/java/src/transport/TTransportFactory.java
index 8c7a093..4ba2c28 100644
--- a/lib/java/src/transport/TTransportFactory.java
+++ b/lib/java/src/transport/TTransportFactory.java
@@ -7,7 +7,7 @@
*
* @author Mark Slee <mcslee@facebook.com>
*/
-public interface TTransportFactory {
+public class TTransportFactory {
/**
* Returns a list of two transports (input, output) from a simple
@@ -16,6 +16,10 @@
* @param in The base transport
* @returns Array of two transports, first for input, second for output
*/
- public TTransport[] getIOTransports(TTransport in);
+ public TTransport[] getIOTransports(TTransport in) {
+ TTransport[] out = new TTransport[2];
+ out[0] = out[1] = in;
+ return out;
+ }
}