THRIFT-3247 Generate a C++ thread-safe client
Client: cpp
Patch: Ben Craig <bencraig@apache.org>
diff --git a/test/cpp/src/StressTest.cpp b/test/cpp/src/StressTest.cpp
index fa468a4..9371bce 100644
--- a/test/cpp/src/StressTest.cpp
+++ b/test/cpp/src/StressTest.cpp
@@ -33,7 +33,6 @@
 #include <thrift/TLogging.h>
 
 #include "Service.h"
-
 #include <iostream>
 #include <set>
 #include <stdexcept>
@@ -102,20 +101,26 @@
   Mutex lock_;
 };
 
+enum TransportOpenCloseBehavior {
+  OpenAndCloseTransportInThread,
+  DontOpenAndCloseTransportInThread
+};
 class ClientThread : public Runnable {
 public:
   ClientThread(boost::shared_ptr<TTransport> transport,
-               boost::shared_ptr<ServiceClient> client,
+               boost::shared_ptr<ServiceIf> client,
                Monitor& monitor,
                size_t& workerCount,
                size_t loopCount,
-               TType loopType)
+               TType loopType,
+               TransportOpenCloseBehavior behavior)
     : _transport(transport),
       _client(client),
       _monitor(monitor),
       _workerCount(workerCount),
       _loopCount(loopCount),
-      _loopType(loopType) {}
+      _loopType(loopType),
+      _behavior(behavior) {}
 
   void run() {
 
@@ -129,8 +134,9 @@
     }
 
     _startTime = Util::currentTime();
-
-    _transport->open();
+    if(_behavior == OpenAndCloseTransportInThread) {
+      _transport->open();
+    }
 
     switch (_loopType) {
     case T_VOID:
@@ -155,7 +161,9 @@
 
     _endTime = Util::currentTime();
 
-    _transport->close();
+    if(_behavior == OpenAndCloseTransportInThread) {
+      _transport->close();
+    }
 
     _done = true;
 
@@ -217,7 +225,7 @@
   }
 
   boost::shared_ptr<TTransport> _transport;
-  boost::shared_ptr<ServiceClient> _client;
+  boost::shared_ptr<ServiceIf> _client;
   Monitor& _monitor;
   size_t& _workerCount;
   size_t _loopCount;
@@ -226,6 +234,7 @@
   int64_t _endTime;
   bool _done;
   Monitor _sleep;
+  TransportOpenCloseBehavior _behavior;
 };
 
 class TStartObserver : public apache::thrift::server::TServerEventHandler {
@@ -253,6 +262,7 @@
 #endif
 
   int port = 9091;
+  string clientType = "regular";
   string serverType = "thread-pool";
   string protocolType = "binary";
   size_t workerCount = 4;
@@ -269,23 +279,23 @@
 
   usage << argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] "
                       "[--protocol-type=<protocol-type>] [--workers=<worker-count>] "
-                      "[--clients=<client-count>] [--loop=<loop-count>]" << endl
+                      "[--clients=<client-count>] [--loop=<loop-count>] "
+                      "[--client-type=<client-type>]" << endl
         << "\tclients        Number of client threads to create - 0 implies no clients, i.e. "
-           "server only.  Default is " << clientCount << endl
+                            "server only.  Default is " << clientCount << endl
         << "\thelp           Prints this help text." << endl
         << "\tcall           Service method to call.  Default is " << callName << endl
-        << "\tloop           The number of remote thrift calls each client makes.  Default is "
-        << loopCount << endl << "\tport           The port the server and clients should bind to "
-                                "for thrift network connections.  Default is " << port << endl
-        << "\tserver         Run the Thrift server in this process.  Default is " << runServer
-        << endl << "\tserver-type    Type of server, \"simple\" or \"thread-pool\".  Default is "
-        << serverType << endl
-        << "\tprotocol-type  Type of protocol, \"binary\", \"ascii\", or \"xml\".  Default is "
-        << protocolType << endl
-        << "\tlog-request    Log all request to ./requestlog.tlog. Default is " << logRequests
-        << endl << "\treplay-request Replay requests from log file (./requestlog.tlog) Default is "
-        << replayRequests << endl << "\tworkers        Number of thread pools workers.  Only valid "
-                                     "for thread-pool server type.  Default is " << workerCount
+        << "\tloop           The number of remote thrift calls each client makes.  Default is " << loopCount << endl 
+        << "\tport           The port the server and clients should bind to "
+                            "for thrift network connections.  Default is " << port << endl
+        << "\tserver         Run the Thrift server in this process.  Default is " << runServer << endl 
+        << "\tserver-type    Type of server, \"simple\" or \"thread-pool\".  Default is " << serverType << endl
+        << "\tprotocol-type  Type of protocol, \"binary\", \"ascii\", or \"xml\".  Default is " << protocolType << endl
+        << "\tlog-request    Log all request to ./requestlog.tlog. Default is " << logRequests << endl 
+        << "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl 
+        << "\tworkers        Number of thread pools workers.  Only valid "
+                            "for thread-pool server type.  Default is " << workerCount << endl
+        << "\tclient-type    Type of client, \"regular\" or \"concurrent\".  Default is " << clientType << endl
         << endl;
 
   map<string, string> args;
@@ -359,7 +369,18 @@
         throw invalid_argument("Unknown server type " + serverType);
       }
     }
+    if (!args["client-type"].empty()) {
+      clientType = args["client-type"];
 
+      if (clientType == "regular") {
+
+      } else if (clientType == "concurrent") {
+
+      } else {
+
+        throw invalid_argument("Unknown client type " + clientType);
+      }
+    }
     if (!args["workers"].empty()) {
       workerCount = atoi(args["workers"].c_str());
     }
@@ -458,7 +479,7 @@
     }
   }
 
-  if (clientCount > 0) {
+  if (clientCount > 0) { //FIXME: start here for client type?
 
     Monitor monitor;
 
@@ -480,15 +501,28 @@
       throw invalid_argument("Unknown service call " + callName);
     }
 
-    for (size_t ix = 0; ix < clientCount; ix++) {
+    if(clientType == "regular") {
+      for (size_t ix = 0; ix < clientCount; ix++) {
 
+        boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port));
+        boost::shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
+        boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket));
+        boost::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
+
+        clientThreads.insert(threadFactory->newThread(boost::shared_ptr<ClientThread>(
+            new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType, OpenAndCloseTransportInThread))));
+      }
+    } else if(clientType == "concurrent") {
       boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port));
       boost::shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
       boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket));
-      boost::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
-
-      clientThreads.insert(threadFactory->newThread(boost::shared_ptr<ClientThread>(
-          new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
+      //boost::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
+      boost::shared_ptr<ServiceConcurrentClient> serviceClient(new ServiceConcurrentClient(protocol));
+      socket->open();
+      for (size_t ix = 0; ix < clientCount; ix++) {
+        clientThreads.insert(threadFactory->newThread(boost::shared_ptr<ClientThread>(
+            new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType, DontOpenAndCloseTransportInThread))));
+      }
     }
 
     for (std::set<boost::shared_ptr<Thread> >::const_iterator thread = clientThreads.begin();
@@ -504,7 +538,7 @@
       Synchronized s(monitor);
       threadCount = clientCount;
 
-      cerr << "Launch " << clientCount << " client threads" << endl;
+      cerr << "Launch " << clientCount << " " << clientType << " client threads" << endl;
 
       time00 = Util::currentTime();