Thrift TTransportFactory model for servers

Summary: Servers need to create bufferedtransports etc. around the transports they get in a user-definable way. So use a factory pattern to allow the user to supply an object to the server that defines this behavior.


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664792 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index b06e06f..554cb90 100644
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -1,7 +1,7 @@
 lib_LTLIBRARIES = libthrift.la
 
-common_cxxflags = -Isrc $(BOOST_CPPFLAGS)
-common_ldflags = $(BOOST_LDFLAGS)
+common_cxxflags = -Wall -Isrc $(BOOST_CPPFLAGS)
+common_ldflags = -Wall $(BOOST_LDFLAGS)
 
 # Define the source file for the module
 
@@ -54,7 +54,9 @@
                          src/transport/TServerTransport.h \
                          src/transport/TSocket.h \
                          src/transport/TTransport.h \
-                         src/transport/TTransportException.h
+                         src/transport/TTransportException.h \
+                         src/transport/TTransportFactory.h \
+                         src/transport/TBufferedTransportFactory.h
 
 include_serverdir = $(include_thriftdir)/server
 include_server_HEADERS = \
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index 4cbcd65..f905b1d 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -23,7 +23,8 @@
  public:
   virtual ~TProcessor() {}
   virtual bool process(shared_ptr<TTransport> in, shared_ptr<TTransport> out) = 0;
-  virtual bool process(shared_ptr<TTransport> io) { return process(io, io); }
+  bool process(shared_ptr<TTransport> io) { return process(io, io); }
+
  protected:
   TProcessor() {}
 };
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index d53223f..c19302f 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -1,7 +1,9 @@
-#ifndef T_SERVER_H
-#define T_SERVER_H
+#ifndef _THRIFT_SERVER_TSERVER_H_
+#define _THRIFT_SERVER_TSERVER_H_ 1
 
 #include <TProcessor.h>
+#include <transport/TServerTransport.h>
+#include <transport/TTransportFactory.h>
 #include <concurrency/Thread.h>
 
 #include <boost/shared_ptr.hpp>
@@ -9,6 +11,7 @@
 namespace facebook { namespace thrift { namespace server { 
 
 using namespace facebook::thrift;
+using namespace facebook::thrift::transport;
 using namespace boost;
 
 class TServerOptions;
@@ -24,10 +27,22 @@
   virtual void run() = 0;
   
 protected:
-  TServer(shared_ptr<TProcessor> processor, shared_ptr<TServerOptions> options) :
+  TServer(shared_ptr<TProcessor> processor,
+          shared_ptr<TServerTransport> serverTransport,
+          shared_ptr<TTransportFactory> transportFactory,
+          shared_ptr<TServerOptions> options) :
+    processor_(processor),
+    serverTransport_(serverTransport),
+    transportFactory_(transportFactory),
+    options_(options) {}
+
+  TServer(shared_ptr<TProcessor> processor,
+          shared_ptr<TServerOptions> options) :
     processor_(processor), options_(options) {}
-  
+ 
   shared_ptr<TProcessor> processor_;
+  shared_ptr<TServerTransport> serverTransport_;
+  shared_ptr<TTransportFactory> transportFactory_;
   shared_ptr<TServerOptions> options_;
 };
   
@@ -35,12 +50,12 @@
  * Class to encapsulate all generic server options.
  */
 class TServerOptions {
-public:
+ public:
   // TODO(mcslee): Fill in getters/setters here
-protected:
+ protected:
   // TODO(mcslee): Fill data members in here
 };
 
 }}} // facebook::thrift::server
 
-#endif
+#endif // #ifndef _THRIFT_SERVER_TSERVER_H_
diff --git a/lib/cpp/src/server/TSimpleServer.cc b/lib/cpp/src/server/TSimpleServer.cc
index 2ad5145..041a52f 100644
--- a/lib/cpp/src/server/TSimpleServer.cc
+++ b/lib/cpp/src/server/TSimpleServer.cc
@@ -1,5 +1,4 @@
 #include "server/TSimpleServer.h"
-#include "transport/TBufferedTransport.h"
 #include "transport/TTransportException.h"
 #include <string>
 #include <iostream>
@@ -15,6 +14,7 @@
 void TSimpleServer::run() {
 
   shared_ptr<TTransport> client;
+  pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
 
   try {
     // Start the server listening
@@ -25,26 +25,21 @@
   }
 
   // Fetch client from server
-  while (true) {
-    try {
+  try {
+    while (true) {
       client = serverTransport_->accept();
-      if (client != NULL) {
-        // Process for as long as we can keep the processor happy!
-        shared_ptr<TBufferedTransport> bufferedClient(new TBufferedTransport(client));
-        while (processor_->process(bufferedClient)) {}
-      }
-    } catch (TTransportException& ttx) {
-      if (client != NULL) {
+      io = transportFactory_->getIOTransports(client);
+      try {
+        while (processor_->process(io.first, io.second)) {}
+      } catch (TTransportException& ttx) {
         cerr << "TSimpleServer client died: " << ttx.getMessage() << endl;
       }
-    }
-  
-    // Clean up the client
-    if (client != NULL) {
-
-      // Ensure no resource leaks
+      io.first->close();
+      io.second->close();
       client->close();
-     }
+    }
+  } catch (TTransportException& ttx) {
+    cerr << "TServerTransport died on accept: " << ttx.getMessage() << endl;
   }
 
   // TODO(mcslee): Could this be a timeout case? Or always the real thing?
diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h
index a8242d4..973ba30 100644
--- a/lib/cpp/src/server/TSimpleServer.h
+++ b/lib/cpp/src/server/TSimpleServer.h
@@ -1,5 +1,5 @@
-#ifndef T_SIMPLE_SERVER_H
-#define T_SIMPLE_SERVER_H
+#ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
+#define _THRIFT_SERVER_TSIMPLESERVER_H_ 1
 
 #include "server/TServer.h"
 #include "transport/TServerTransport.h"
@@ -17,18 +17,17 @@
 class TSimpleServer : public TServer {
  public:
   TSimpleServer(shared_ptr<TProcessor> processor,
-                shared_ptr<TServerOptions> options,
-                shared_ptr<TServerTransport> serverTransport) :
-    TServer(processor, options), serverTransport_(serverTransport) {}
+                shared_ptr<TServerTransport> serverTransport,
+                shared_ptr<TTransportFactory> transportFactory,
+                shared_ptr<TServerOptions> options) :
+    TServer(processor, serverTransport, transportFactory, options) {}
     
   ~TSimpleServer() {}
 
   void run();
 
- protected:
-  shared_ptr<TServerTransport> serverTransport_;
 };
 
 }}} // facebook::thrift::server
 
-#endif
+#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
diff --git a/lib/cpp/src/server/TThreadPoolServer.cc b/lib/cpp/src/server/TThreadPoolServer.cc
index d53d174..1eab53d 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cc
+++ b/lib/cpp/src/server/TThreadPoolServer.cc
@@ -1,5 +1,4 @@
 #include "server/TThreadPoolServer.h"
-#include "transport/TBufferedTransport.h"
 #include "transport/TTransportException.h"
 #include "concurrency/Thread.h"
 #include "concurrency/ThreadManager.h"
@@ -15,54 +14,52 @@
 class TThreadPoolServer::Task: public Runnable {
     
   shared_ptr<TProcessor> _processor;
-  shared_ptr<TTransport> _transport;
-  shared_ptr<TBufferedTransport> _bufferedTransport;
+  shared_ptr<TTransport> _input;
+  shared_ptr<TTransport> _output;
     
 public:
     
   Task(shared_ptr<TProcessor> processor,
-       shared_ptr<TTransport> transport) :
+       shared_ptr<TTransport> input,
+       shared_ptr<TTransport> output) :
     _processor(processor),
-    _transport(transport),
-    _bufferedTransport(new TBufferedTransport(transport)) {
+    _input(input),
+    _output(output) {
   }
 
   ~Task() {}
     
-  void run() {
-      
+  void run() {     
     while(true) {
-	
       try {
-	_processor->process(_bufferedTransport);
-	
+	_processor->process(_input, _output);
       } catch (TTransportException& ttx) {
-	
-	break;
-	
+        break;
       } catch(...) {
-	
-	break;
+        break;
       }
     }
-    
-    _bufferedTransport->close();
+    _input->close();
+    _output->close();
   }
 };
   
 TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
-				     shared_ptr<TServerOptions> options,
-				     shared_ptr<TServerTransport> serverTransport,
-				     shared_ptr<ThreadManager> threadManager) :
-  TServer(processor, options), 
-  serverTransport_(serverTransport), 
+                                     shared_ptr<TServerTransport> serverTransport,
+                                     shared_ptr<TTransportFactory> transportFactory,
+                                     shared_ptr<ThreadManager> threadManager,
+                                     shared_ptr<TServerOptions> options) :
+  TServer(processor, serverTransport, transportFactory, options), 
   threadManager_(threadManager) {
 }
-    
+
 TThreadPoolServer::~TThreadPoolServer() {}
 
 void TThreadPoolServer::run() {
 
+  shared_ptr<TTransport> client;
+  pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
+
   try {
     // Start the server listening
     serverTransport_->listen();
@@ -71,15 +68,14 @@
     return;
   }
   
-  // Fetch client from server
-  
-  while (true) {
-    
+  while (true) {   
     try {
-      
-      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, 
-											  shared_ptr<TTransport>(serverTransport_->accept()))));
-      
+      // Fetch client from server
+      client = serverTransport_->accept();
+      // Make IO transports
+      io = transportFactory_->getIOTransports(client);
+      // Add to threadmanager pool
+      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, io.first, io.second)));
     } catch (TTransportException& ttx) {
       break;
     }
diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h
index 827491d..34b216c 100644
--- a/lib/cpp/src/server/TThreadPoolServer.h
+++ b/lib/cpp/src/server/TThreadPoolServer.h
@@ -1,5 +1,5 @@
-#ifndef T_THREADPOOL_SERVER_H
-#define T_THREADPOOL_SERVER_H
+#ifndef _THRIFT_SERVER_TTHREADPOOLSERVER_H_
+#define _THRIFT_SERVER_TTHREADPOOLSERVER_H_ 1
 
 #include <concurrency/ThreadManager.h>
 #include <server/TServer.h>
@@ -19,9 +19,10 @@
   class Task;
   
   TThreadPoolServer(shared_ptr<TProcessor> processor,
-		    shared_ptr<TServerOptions> options,
 		    shared_ptr<TServerTransport> serverTransport,
-		    shared_ptr<ThreadManager> threadManager);
+		    shared_ptr<TTransportFactory> transportFactory,
+		    shared_ptr<ThreadManager> threadManager,
+		    shared_ptr<TServerOptions> options);
 
   virtual ~TThreadPoolServer();
 
@@ -29,11 +30,10 @@
 
 protected:
 
-  shared_ptr<TServerTransport> serverTransport_;
   shared_ptr<ThreadManager> threadManager_;
   
 };
 
 }}} // facebook::thrift::server
 
-#endif
+#endif // #ifndef _THRIFT_SERVER_TTHREADPOOLSERVER_H_
diff --git a/lib/cpp/src/transport/TBufferedTransportFactory.h b/lib/cpp/src/transport/TBufferedTransportFactory.h
new file mode 100644
index 0000000..c6e87b1
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedTransportFactory.h
@@ -0,0 +1,33 @@
+#ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORTFACTORY_H_
+#define _THRIFT_TRANSPORT_TBUFFEREDTRANSPORTFACTORY_H_ 1
+
+#include <transport/TTransportFactory.h>
+#include <transport/TBufferedTransport.h>
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace transport { 
+
+/**
+ * Wraps a transport into a buffered one.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TBufferedTransportFactory : public TTransportFactory {
+ public:
+  TBufferedTransportFactory() {}
+
+  virtual ~TBufferedTransportFactory() {}
+
+  /**
+   * Wraps the transport into a buffered one.
+   */
+  virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
+    boost::shared_ptr<TTransport> buffered(new TBufferedTransport(trans));
+    return std::make_pair(buffered, buffered);
+  }
+
+};
+
+}}}
+
+#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_
diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h
index 19a2cb6..d65d25b 100644
--- a/lib/cpp/src/transport/TTransport.h
+++ b/lib/cpp/src/transport/TTransport.h
@@ -1,7 +1,7 @@
 #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
 #define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1
 
-#include "TTransportException.h"
+#include <transport/TTransportException.h>
 #include <string>
 
 namespace facebook { namespace thrift { namespace transport { 
diff --git a/lib/cpp/src/transport/TTransportFactory.h b/lib/cpp/src/transport/TTransportFactory.h
new file mode 100644
index 0000000..abd1048
--- /dev/null
+++ b/lib/cpp/src/transport/TTransportFactory.h
@@ -0,0 +1,33 @@
+#ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_
+#define _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_ 1
+
+#include <transport/TTransport.h>
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace transport { 
+
+/**
+ * Generic factory class to make an input and output transport out of a
+ * source transport. Commonly used inside servers to make input and output
+ * streams out of raw clients.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TTransportFactory {
+ public:
+  TTransportFactory() {}
+
+  virtual ~TTransportFactory() {}
+
+  /**
+   * Default implementation does nothing, just returns the transport given.
+   */
+  virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
+    return std::make_pair(trans, trans);
+  }
+
+};
+
+}}}
+
+#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_
diff --git a/lib/java/src/server/TServer.java b/lib/java/src/server/TServer.java
index 702ba69..dd83098 100644
--- a/lib/java/src/server/TServer.java
+++ b/lib/java/src/server/TServer.java
@@ -1,6 +1,9 @@
 package com.facebook.thrift.server;
 
 import com.facebook.thrift.TProcessor;
+import com.facebook.thrift.transport.TServerTransport;
+import com.facebook.thrift.transport.TTransportFactory;
+import com.facebook.thrift.transport.TBaseTransportFactory;
 
 /**
  * Generic interface for a Thrift server.
@@ -17,24 +20,64 @@
     public Options() {}
   }
 
-  /** Core processor */
+  /**
+   * Core processor
+   */
   protected TProcessor processor_;
 
-  /** Server options */
+  /**
+   * Server options
+   */
   protected Options options_;
 
   /**
-   * Default options constructor
+   * Server transport
    */
-  protected TServer(TProcessor processor) {
-    this(processor, new Options());
-  }
+  protected TServerTransport serverTransport_;
 
   /**
-   * Default constructor, all servers take a processor and some options.
+   * Transport Factory
    */
-  protected TServer(TProcessor processor, Options options) {
+  protected TTransportFactory transportFactory_;
+
+  /**
+   * Default constructors.
+   */
+
+  protected TServer(TProcessor processor,
+                    TServerTransport serverTransport) {
+    this(processor,
+         serverTransport,
+         new TBaseTransportFactory(),
+         new Options());
+  }
+
+  protected TServer(TProcessor processor,
+                    TServerTransport serverTransport,
+                    TTransportFactory transportFactory) {
+    this(processor,
+         serverTransport,
+         transportFactory,
+         new Options());
+  }
+
+
+  protected TServer(TProcessor processor,
+                    TServerTransport serverTransport,
+                    Options options) {
+    this(processor,
+         serverTransport,
+         new TBaseTransportFactory(),
+         options);
+  }
+
+  protected TServer(TProcessor processor,
+                    TServerTransport serverTransport,
+                    TTransportFactory transportFactory,
+                    Options options) {
     processor_ = processor;
+    serverTransport_ = serverTransport;
+    transportFactory_ = transportFactory;
     options_ = options;
   }
   
diff --git a/lib/java/src/server/TSimpleServer.java b/lib/java/src/server/TSimpleServer.java
index 94b739e..76a5762 100644
--- a/lib/java/src/server/TSimpleServer.java
+++ b/lib/java/src/server/TSimpleServer.java
@@ -13,19 +13,9 @@
  */
 public class TSimpleServer extends TServer {
 
-  private TServerTransport serverTransport_;
-
   public TSimpleServer(TProcessor processor,
                        TServerTransport serverTransport) {
-    this(processor, new TServer.Options(), serverTransport);
-  }
-
-
-  public TSimpleServer(TProcessor processor,
-                       TServer.Options options,
-                       TServerTransport serverTransport) {
-    super(processor, options);
-    serverTransport_ = serverTransport;
+    super(processor, serverTransport);
   }
 
   public void run() {
@@ -38,18 +28,24 @@
 
     while (true) {
       TTransport client = null;
+      TTransport[] io = null;
       try {
         client = serverTransport_.accept();
         if (client != null) {
-          while (processor_.process(client, client));
+          io = transportFactory_.getIOTransports(client);
+          while (processor_.process(io[0], io[1]));
         }
       } catch (TException tx) {
         tx.printStackTrace();
       }
 
-      if (client != null) {
-        client.close();
-        client = null;
+      if (io != null) {
+        if (io[0] != null) {
+          io[0].close();
+        }
+        if (io[1] != null) {
+          io[1].close();
+        }
       }
     }
   }
diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java
index d19275e..2f5be8d 100644
--- a/lib/java/src/server/TThreadPoolServer.java
+++ b/lib/java/src/server/TThreadPoolServer.java
@@ -5,6 +5,8 @@
 import com.facebook.thrift.transport.TServerTransport;
 import com.facebook.thrift.transport.TTransport;
 import com.facebook.thrift.transport.TTransportException;
+import com.facebook.thrift.transport.TTransportFactory;
+import com.facebook.thrift.transport.TBaseTransportFactory;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -20,28 +22,28 @@
  */
 public class TThreadPoolServer extends TServer {
 
-  // Server transport
-  private TServerTransport serverTransport_;
-
   // Executor service for handling client connections
   private ExecutorService executorService_;
 
   // Customizable server options
   public static class Options extends TServer.Options {
-    public int port = 9190;
     public int minWorkerThreads = 5;
     public int maxWorkerThreads = Integer.MAX_VALUE;
   }
 
   public TThreadPoolServer(TProcessor processor,
                            TServerTransport serverTransport) {
-    this(processor, new Options(), serverTransport);
+    this(processor,
+         serverTransport,
+         new TBaseTransportFactory(),
+         new Options());
   }
-
+  
   public TThreadPoolServer(TProcessor processor,
-                           Options options,
-                           TServerTransport serverTransport) {
-    super(processor, options);
+                           TServerTransport serverTransport,
+                           TTransportFactory transportFactory,
+                           Options options) {
+    super(processor, serverTransport, transportFactory, options);
     serverTransport_ = serverTransport;
     executorService_ = null;
 
@@ -95,12 +97,22 @@
      * Loops on processing a client forever
      */
     public void run() {
+      TTransport[] io = null;
       try {
-        while (processor_.process(client_, client_)) {}
+        io = transportFactory_.getIOTransports(client_);
+        while (processor_.process(io[0], io[1])) {}
       } catch (TException tx) {
         tx.printStackTrace();
       }
-      client_.close();
+
+      if (io != null) {
+        if (io[0] != null) {
+          io[0].close();
+        }
+        if (io[1] != null) {
+          io[1].close();
+        }
+      }
     }
   }
 }
diff --git a/lib/java/src/transport/TBaseTransportFactory.java b/lib/java/src/transport/TBaseTransportFactory.java
new file mode 100644
index 0000000..90bbbe1
--- /dev/null
+++ b/lib/java/src/transport/TBaseTransportFactory.java
@@ -0,0 +1,23 @@
+package com.facebook.thrift.transport;
+
+/**
+ * Base transport factory just returns the arg transport.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+public class TBaseTransportFactory implements TTransportFactory {
+
+  /**
+   * Returns a list of two transports (input, output) from a simple
+   * Transport.
+   *
+   * @param in The base transport
+   * @returns Array of two transports, first for input, second for output
+   */
+  public TTransport[] getIOTransports(TTransport in) {
+    TTransport[] out = new TTransport[2];
+    out[0] = out[1] = in;
+    return out;
+  }
+
+}
diff --git a/lib/java/src/transport/TTransportFactory.java b/lib/java/src/transport/TTransportFactory.java
new file mode 100644
index 0000000..8c7a093
--- /dev/null
+++ b/lib/java/src/transport/TTransportFactory.java
@@ -0,0 +1,21 @@
+package com.facebook.thrift.transport;
+
+/**
+ * Factory class used to create an input and output transport out of a simple
+ * transport. This is used primarily in servers, which get Transports from
+ * a ServerTransport and then may want to mutate them.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+public interface TTransportFactory {
+
+  /**
+   * Returns a list of two transports (input, output) from a simple
+   * Transport.
+   *
+   * @param in The base transport
+   * @returns Array of two transports, first for input, second for output
+   */
+  public TTransport[] getIOTransports(TTransport in);
+
+}
diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py
index a5d5621..53f6846 100644
--- a/lib/py/src/server/TServer.py
+++ b/lib/py/src/server/TServer.py
@@ -8,8 +8,13 @@
 
   """Base interface for a server, which must have a run method."""
 
-  def __init__(self, proc):
-    self.processor = proc
+  def __init__(self, processor, serverTransport, transportFactory=None):
+    self.processor = processor
+    self.serverTransport = serverTransport
+    if transportFactory == None:
+      self.transportFactory = TTransport.TTransportFactoryBase()
+    else:
+      self.transportFactory = transportFactory
 
   def run(self):
     pass
@@ -18,18 +23,20 @@
 
   """Simple single-threaded server that just pumps around one transport."""
 
-  def __init__(self, proc, trans):
-    TServer.__init__(self, proc)
-    self.transport = trans
+  def __init__(self, processor, serverTransport, transportFactory=None):
+    TServer.__init__(self, processor, serverTransport, transportFactory)
 
   def run(self):
-    self.transport.listen()
+    self.serverTransport.listen()
     while True:
-      client = TTransport.TBufferedTransport(self.transport.accept())
+      client = self.serverTransport.accept()
+      (input, output) = self.transportFactory.getIOTransports(client)
       try:
         while True:
-          self.processor.process(client, client)
+          self.processor.process(input, output)
       except Exception, x:
         print '%s, %s, %s' % (type(x), x, traceback.format_exc())
         print 'Client died.'
-      client.close()
+
+      input.close()
+      output.close()
diff --git a/lib/py/src/transport/TSocket.py b/lib/py/src/transport/TSocket.py
index 61f1cff..2c7dd3e 100644
--- a/lib/py/src/transport/TSocket.py
+++ b/lib/py/src/transport/TSocket.py
@@ -21,8 +21,9 @@
     self.handle.connect((self.host, self.port))
 
   def close(self):
-    self.handle.close()
-    self.handle = None
+    if self.handle != None:
+      self.handle.close()
+      self.handle = None
 
   def readAll(self, sz):
     buff = ''
diff --git a/lib/py/src/transport/TTransport.py b/lib/py/src/transport/TTransport.py
index 1e8b6c6..a7eb3b0 100644
--- a/lib/py/src/transport/TTransport.py
+++ b/lib/py/src/transport/TTransport.py
@@ -36,6 +36,22 @@
   def close(self):
     pass
 
+class TTransportFactoryBase:
+
+  """Base class for a Transport Factory"""
+
+  def getIOTransports(self, trans):
+    return (trans, trans)
+
+class TBufferedTransportFactory:
+
+  """Factory transport that builds buffered transports"""
+
+  def getIOTransports(self, trans):
+    buffered = TBufferedTransport(trans)
+    return (buffered, buffered)
+
+
 class TBufferedTransport(TTransportBase):
 
   """Class that wraps another transport and buffers its I/O."""
diff --git a/test/cpp/src/TestClient.cc b/test/cpp/src/TestClient.cc
index e12b65b..6334f08 100644
--- a/test/cpp/src/TestClient.cc
+++ b/test/cpp/src/TestClient.cc
@@ -45,23 +45,28 @@
   }
 
   shared_ptr<TSocket> socket(new TSocket(host, port));
-  shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
+  shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket));
   shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol());
   ThriftTestClient testClient(bufferedSocket, binaryProtocol);
-  
+
+  uint64_t time_min = 0;
+  uint64_t time_max = 0;
+  uint64_t time_tot = 0;
+ 
   int test = 0;
   for (test = 0; test < numTests; ++test) {
 
-    /**
-     * CONNECT TEST
-     */
-    printf("Test #%d, connect %s:%d\n", test+1, host.c_str(), port);
     try {
       bufferedSocket->open();
     } catch (TTransportException& ttx) {
       printf("Connect failed: %s\n", ttx.getMessage().c_str());
       continue;
     }
+    
+    /**
+     * CONNECT TEST
+     */
+    printf("Test #%d, connect %s:%d\n", test+1, host.c_str(), port);
 
     uint64_t start = now();
     
@@ -379,12 +384,29 @@
     }
     
     uint64_t stop = now();
+    uint64_t tot = stop-start;
+
     printf("Total time: %lu us\n", stop-start);
     
+    time_tot += tot;
+    if (time_min == 0 || tot < time_min) {
+      time_min = tot;
+    }
+    if (tot > time_max) {
+      time_max = tot;
+    }
+
     bufferedSocket->close();
   }
 
   //  printf("\nSocket syscalls: %u", g_socket_syscalls);
   printf("\nAll tests done.\n");
+
+  uint64_t time_avg = time_tot / numTests;
+
+  printf("Min time: %lu us\n", time_min);
+  printf("Max time: %lu us\n", time_max);
+  printf("Avg time: %lu us\n", time_avg);
+
   return 0;
 }
diff --git a/test/cpp/src/TestServer.cc b/test/cpp/src/TestServer.cc
index 97d3440..f743aea 100644
--- a/test/cpp/src/TestServer.cc
+++ b/test/cpp/src/TestServer.cc
@@ -4,6 +4,7 @@
 #include <server/TSimpleServer.h>
 #include <server/TThreadPoolServer.h>
 #include <transport/TServerSocket.h>
+#include <transport/TBufferedTransportFactory.h>
 #include "ThriftTest.h"
 
 #include <iostream>
@@ -53,23 +54,13 @@
   }
 
   Xtruct testStruct(Xtruct thing) {
-    printf("testStruct({\"%s\", %d, %d, %ld})\n",
-           thing.string_thing.c_str(),
-           (int)thing.byte_thing,
-           thing.i32_thing,
-           thing.i64_thing);
+    printf("testStruct({\"%s\", %d, %d, %ld})\n", thing.string_thing.c_str(), (int)thing.byte_thing, thing.i32_thing, thing.i64_thing);
     return thing;
   }
 
   Xtruct2 testNest(Xtruct2 nest) {
     Xtruct thing = nest.struct_thing;
-    printf("testNest({%d, {\"%s\", %d, %d, %ld}, %d})\n",
-           (int)nest.byte_thing,
-           thing.string_thing.c_str(),
-           (int)thing.byte_thing,
-           thing.i32_thing,
-           thing.i64_thing,
-           nest.i32_thing);
+    printf("testNest({%d, {\"%s\", %d, %d, %ld}, %d})\n", (int)nest.byte_thing, thing.string_thing.c_str(), (int)thing.byte_thing, thing.i32_thing, thing.i64_thing, nest.i32_thing);
     return nest;
   }
 
@@ -205,11 +196,7 @@
         list<Xtruct>::const_iterator x;
         printf("{");
         for (x = xtructs.begin(); x != xtructs.end(); ++x) {
-          printf("{\"%s\", %d, %d, %ld}, ",
-                 x->string_thing.c_str(),
-                 (int)x->byte_thing,
-                 x->i32_thing,
-                 x->i64_thing);
+          printf("{\"%s\", %d, %d, %ld}, ", x->string_thing.c_str(), (int)x->byte_thing, x->i32_thing, x->i64_thing);
         }
         printf("}");
 
@@ -347,18 +334,23 @@
 
   shared_ptr<ThriftTestServer> testServer(new ThriftTestServer(testHandler, binaryProtocol));
 
-  // Options
-  shared_ptr<TServerOptions> serverOptions(new TServerOptions());
-
   // Transport
   shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
 
+  // Factory
+  shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
+
+  // Options
+  shared_ptr<TServerOptions> serverOptions(new TServerOptions());
+
   if (serverType == "simple") {
 
     // Server
     TSimpleServer simpleServer(testServer,
-			       serverOptions,
-			       serverSocket);
+			       serverSocket,
+                               transportFactory,
+			       serverOptions
+                               );
 
     printf("Starting the server on port %d...\n", port);
     simpleServer.run();
@@ -376,9 +368,10 @@
     threadManager->start();
 
     TThreadPoolServer threadPoolServer(testServer,
-				       serverOptions,
 				       serverSocket,
-				       threadManager);
+                                       transportFactory,
+				       threadManager,
+				       serverOptions);
 
     printf("Starting the server on port %d...\n", port);
     threadPoolServer.run();
diff --git a/test/java/src/TestClient.java b/test/java/src/TestClient.java
index 74fbfef..fda37de 100644
--- a/test/java/src/TestClient.java
+++ b/test/java/src/TestClient.java
@@ -42,13 +42,16 @@
       ThriftTest.Client testClient =
         new ThriftTest.Client(tSocket, binaryProtocol);
 
+      long timeMin = 0;
+      long timeMax = 0;
+      long timeTot = 0;
+
       for (int test = 0; test < numTests; ++test) {
 
         /**
          * CONNECT TEST
          */
-        System.out.println("Test #" + (test+1) + ", " +
-                           "connect " + host + ":" + port);
+        System.out.println("Test #" + (test+1) + ", " + "connect " + host + ":" + port);
         try {
           tSocket.open();
         } catch (TTransportException ttx) {
@@ -56,7 +59,7 @@
           continue;
         }
 
-        long start = System.currentTimeMillis();
+        long start = System.nanoTime();
     
         /**
          * VOID TEST
@@ -110,11 +113,7 @@
         out.i32_thing = -3;
         out.i64_thing = -5;
         Xtruct in = testClient.testStruct(out);
-        System.out.print(" = {" +
-                         "\"" + in.string_thing + "\", " +
-                         in.byte_thing + ", " +
-                         in.i32_thing + ", " +
-                         in.i64_thing + "}\n");
+        System.out.print(" = {" + "\"" + in.string_thing + "\", " + in.byte_thing + ", " + in.i32_thing + ", " + in.i64_thing + "}\n");
 
         /**
          * NESTED STRUCT TEST
@@ -126,13 +125,7 @@
         out2.i32_thing = 5;
         Xtruct2 in2 = testClient.testNest(out2);
         in = in2.struct_thing;
-        System.out.print(" = {" +
-                         in2.byte_thing + ", {" +
-                         "\"" + in.string_thing + "\", " +
-                         in.byte_thing + ", " +
-                         in.i32_thing + ", " +
-                         in.i64_thing + "}, " +
-                         in2.i32_thing + "}\n");
+        System.out.print(" = {" + in2.byte_thing + ", {" + "\"" + in.string_thing + "\", " + in.byte_thing + ", " + in.i32_thing + ", " + in.i64_thing + "}, " + in2.i32_thing + "}\n");
 
         /**
          * MAP TEST
@@ -299,19 +292,14 @@
             HashMap<Integer, Long> userMap = v2.userMap;
             System.out.print("{");
             for (int k3 : userMap.keySet()) {
-              System.out.print(k3 + " => " +
-                               userMap.get(k3) + ", ");
+              System.out.print(k3 + " => " + userMap.get(k3) + ", ");
             }
             System.out.print("}, ");
 
             ArrayList<Xtruct> xtructs = v2.xtructs;
             System.out.print("{");
             for (Xtruct x : xtructs) {
-              System.out.print("{" +
-                               "\"" + x.string_thing + "\", " +
-                               x.byte_thing + ", " +
-                               x.i32_thing + ", "+
-                               x.i64_thing + "}, ");
+              System.out.print("{" + "\"" + x.string_thing + "\", " + x.byte_thing + ", " + x.i32_thing + ", "+ x.i64_thing + "}, ");
             }
             System.out.print("}");
 
@@ -321,14 +309,32 @@
         }
         System.out.print("}\n");
 
-        long stop = System.currentTimeMillis();
-        System.out.println("Total time: " + (stop-start) + "ms");
+        long stop = System.nanoTime();
+        long tot = stop-start;
+
+        System.out.println("Total time: " + tot/1000 + "us");
+
+        if (timeMin == 0 || tot < timeMin) {
+          timeMin = tot;
+        }
+        if (tot > timeMax) {
+          timeMax = tot;
+        }
+        timeTot += tot;
 
         tSocket.close();
       }
+
+      long timeAvg = timeTot / numTests;
       
+      System.out.println("Min time: " + timeMin/1000 + "us");
+      System.out.println("Max time: " + timeMax/1000 + "us");
+      System.out.println("Avg time: " + timeAvg/1000 + "us");
+            
     } catch (Exception x) {
       x.printStackTrace();
-    }
+    }  
+
   }
+
 }
diff --git a/test/py/TestServer.py b/test/py/TestServer.py
index 525ffee..db2ad81 100755
--- a/test/py/TestServer.py
+++ b/test/py/TestServer.py
@@ -5,6 +5,7 @@
 
 import ThriftTest
 from ThriftTest_types import *
+from thrift.transport import TTransport
 from thrift.transport import TSocket
 from thrift.protocol import TBinaryProtocol
 from thrift.server import TServer
@@ -54,5 +55,6 @@
 protocol = TBinaryProtocol.TBinaryProtocol()
 handler = TestHandler()
 iface = ThriftTest.Server(handler, protocol)
-server = TServer.TSimpleServer(iface, transport)
+factory = TTransport.TBufferedTransportFactory()
+server = TServer.TSimpleServer(iface, transport, factory)
 server.run()