THRIFT-2013: add multiplex server and client test support to cpp language
add multiplex client test support to csharp and java languages
fix a bug in the server-side header protocol factory
fix a bug in the cpp SSL server socket implementation
remove unnecessary sleep in cpp server testOneway

This closes #1414
diff --git a/test/cpp/src/TestClient.cpp b/test/cpp/src/TestClient.cpp
index 6b2e731..6e7ff8e 100644
--- a/test/cpp/src/TestClient.cpp
+++ b/test/cpp/src/TestClient.cpp
@@ -26,6 +26,7 @@
 #include <thrift/protocol/TCompactProtocol.h>
 #include <thrift/protocol/THeaderProtocol.h>
 #include <thrift/protocol/TJSONProtocol.h>
+#include <thrift/protocol/TMultiplexedProtocol.h>
 #include <thrift/transport/THttpClient.h>
 #include <thrift/transport/TTransportUtils.h>
 #include <thrift/transport/TSocket.h>
@@ -40,13 +41,15 @@
 #include <inttypes.h>
 #endif
 
-#include <boost/program_options.hpp>
+#include <boost/algorithm/string.hpp>
 #include <boost/filesystem.hpp>
+#include <boost/program_options.hpp>
 #include <thrift/stdcxx.h>
 #if _WIN32
 #include <thrift/windows/TWinsockSingleton.h>
 #endif
 
+#include "SecondService.h"
 #include "ThriftTest.h"
 
 using namespace std;
@@ -156,28 +159,33 @@
   int return_code = 0;
 
   boost::program_options::options_description desc("Allowed options");
-  desc.add_options()("help,h",
-                     "produce help message")("host",
-                                             boost::program_options::value<string>(&host)
-                                                 ->default_value(host),
-                                             "Host to connect")("port",
-                                                                boost::program_options::value<int>(
-                                                                    &port)->default_value(port),
-                                                                "Port number to connect")(
-      "domain-socket",
-      boost::program_options::value<string>(&domain_socket)->default_value(domain_socket),
-      "Domain Socket (e.g. /tmp/ThriftTest.thrift), instead of host and port")(
-      "abstract-namespace",
-      "Look for the domain socket in the Abstract Namespace (no connection with filesystem pathnames)")(
-      "transport",
-      boost::program_options::value<string>(&transport_type)->default_value(transport_type),
-      "Transport: buffered, framed, http, evhttp")(
-      "protocol",
-      boost::program_options::value<string>(&protocol_type)->default_value(protocol_type),
-      "Protocol: binary, header, compact, json")("ssl", "Encrypted Transport using SSL")(
-      "testloops,n",
-      boost::program_options::value<int>(&numTests)->default_value(numTests),
-      "Number of Tests")("noinsane", "Do not run insanity test");
+  desc.add_options()
+      ("help,h", "produce help message")
+      ("host", 
+          boost::program_options::value<string>(&host)->default_value(host), 
+          "Host to connect")
+      ("port", 
+          boost::program_options::value<int>(&port)->default_value(port), 
+          "Port number to connect")
+      ("domain-socket", 
+          boost::program_options::value<string>(&domain_socket)->default_value(domain_socket),
+          "Domain Socket (e.g. /tmp/ThriftTest.thrift), instead of host and port")
+      ("abstract-namespace",
+          "Look for the domain socket in the Abstract Namespace"
+          " (no connection with filesystem pathnames)")
+      ("transport",
+          boost::program_options::value<string>(&transport_type)->default_value(transport_type),
+          "Transport: buffered, framed, http, evhttp")
+      ("protocol",
+          boost::program_options::value<string>(&protocol_type)->default_value(protocol_type),
+          "Protocol: binary, compact, header, json, multi, multic, multih, multij")
+      ("ssl", 
+          "Encrypted Transport using SSL")
+      ("testloops,n",
+          boost::program_options::value<int>(&numTests)->default_value(numTests),
+          "Number of Tests")
+      ("noinsane",
+          "Do not run insanity test");
 
   boost::program_options::variables_map vm;
   boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
@@ -194,6 +202,10 @@
       } else if (protocol_type == "compact") {
       } else if (protocol_type == "header") {
       } else if (protocol_type == "json") {
+      } else if (protocol_type == "multi") {
+      } else if (protocol_type == "multic") {
+      } else if (protocol_type == "multih") {
+      } else if (protocol_type == "multij") {
       } else {
         throw invalid_argument("Unknown protocol type " + protocol_type);
       }
@@ -232,6 +244,7 @@
   stdcxx::shared_ptr<TSocket> socket;
   stdcxx::shared_ptr<TTransport> transport;
   stdcxx::shared_ptr<TProtocol> protocol;
+  stdcxx::shared_ptr<TProtocol> protocol2;	// SecondService for multiplexed
 
   if (ssl) {
     cout << "Client Certificate File: " << certPath << endl;
@@ -271,18 +284,20 @@
     transport = bufferedSocket;
   }
 
-  if (protocol_type.compare("json") == 0) {
-    stdcxx::shared_ptr<TProtocol> jsonProtocol(new TJSONProtocol(transport));
-    protocol = jsonProtocol;
-  } else if (protocol_type.compare("compact") == 0) {
-    stdcxx::shared_ptr<TProtocol> compactProtocol(new TCompactProtocol(transport));
-    protocol = compactProtocol;
-  } else if (protocol_type == "header") {
-    stdcxx::shared_ptr<TProtocol> headerProtocol(new THeaderProtocol(transport));
-    protocol = headerProtocol;
+  if (protocol_type == "json" || protocol_type == "multij") {
+    protocol = stdcxx::make_shared<TJSONProtocol>(transport);
+  } else if (protocol_type == "compact" || protocol_type == "multic") {
+    protocol = stdcxx::make_shared<TCompactProtocol>(transport);
+  } else if (protocol_type == "header" || protocol_type == "multih") {
+    protocol = stdcxx::make_shared<THeaderProtocol>(transport);
   } else {
-    stdcxx::shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol(transport));
-    protocol = binaryProtocol;
+    protocol = stdcxx::make_shared<TBinaryProtocol>(transport);
+  }
+
+  if (boost::starts_with(protocol_type, "multi")) {
+	protocol2 = stdcxx::make_shared<TMultiplexedProtocol>(protocol, "SecondService");
+	// we don't need access to the original protocol any more, so...
+	protocol = stdcxx::make_shared<TMultiplexedProtocol>(protocol, "ThriftTest");
   }
 
   // Connection info
@@ -367,6 +382,25 @@
       return_code |= ERR_BASETYPES;
     }
 
+    //
+    // Multiplexed protocol - call another service method
+    // in the middle of the ThriftTest
+    //
+    if (boost::starts_with(protocol_type, "multi")) {
+		SecondServiceClient ssc(protocol2);
+		// transport is already open...
+		  
+        try {
+          cout << "secondService.secondTestString(\"foo\") => " << flush;
+	  	  std::string result;
+		  ssc.secondtestString(result, "foo");
+		  cout << "{" << result << "}" << endl;
+	    } catch (std::exception& e) {
+		  cout << "  *** FAILED *** " << e.what() << endl;
+		  return_code |= ERR_EXCEPTIONS;
+		}
+    }
+
     try {
 #ifdef _MSC_VER
 #pragma warning( push )
@@ -1096,12 +1130,14 @@
     /**
      * I32 TEST
      */
-    cout << "re-test testI32(-1)";
+    cout << "re-test testI32(-1)" << flush;
     int i32 = testClient.testI32(-1);
     cout << " = " << i32 << endl;
     if (i32 != -1)
       return_code |= ERR_BASETYPES;
 
+    cout << endl << "All tests done." << endl << flush;
+    
     uint64_t stop = now();
     uint64_t tot = stop - start;
 
@@ -1115,10 +1151,10 @@
       time_max = tot;
     }
 
+    cout << flush;
     transport->close();
   }
 
-  cout << endl << "All tests done." << endl;
 
   uint64_t time_avg = time_tot / numTests;
 
diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp
index 37d0eb6..744a86c 100644
--- a/test/cpp/src/TestServer.cpp
+++ b/test/cpp/src/TestServer.cpp
@@ -17,26 +17,29 @@
  * under the License.
  */
 
-#include <thrift/concurrency/ThreadManager.h>
+#include <thrift/async/TAsyncBufferProcessor.h>
+#include <thrift/async/TAsyncProtocolProcessor.h>
+#include <thrift/async/TEvhttpServer.h>
 #include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadManager.h>
+#include <thrift/processor/TMultiplexedProcessor.h>
 #include <thrift/protocol/TBinaryProtocol.h>
 #include <thrift/protocol/TCompactProtocol.h>
 #include <thrift/protocol/THeaderProtocol.h>
 #include <thrift/protocol/TJSONProtocol.h>
-#include <thrift/server/TSimpleServer.h>
-#include <thrift/server/TThreadedServer.h>
-#include <thrift/server/TThreadPoolServer.h>
-#include <thrift/async/TEvhttpServer.h>
-#include <thrift/async/TAsyncBufferProcessor.h>
-#include <thrift/async/TAsyncProtocolProcessor.h>
 #include <thrift/server/TNonblockingServer.h>
-#include <thrift/transport/TServerSocket.h>
-#include <thrift/transport/TSSLServerSocket.h>
-#include <thrift/transport/TSSLSocket.h>
-#include <thrift/transport/TNonblockingServerSocket.h>
+#include <thrift/server/TSimpleServer.h>
+#include <thrift/server/TThreadPoolServer.h>
+#include <thrift/server/TThreadedServer.h>
 #include <thrift/transport/THttpServer.h>
 #include <thrift/transport/THttpTransport.h>
+#include <thrift/transport/TNonblockingServerSocket.h>
+#include <thrift/transport/TSSLServerSocket.h>
+#include <thrift/transport/TSSLSocket.h>
+#include <thrift/transport/TServerSocket.h>
 #include <thrift/transport/TTransportUtils.h>
+
+#include "SecondService.h"
 #include "ThriftTest.h"
 
 #ifdef HAVE_STDINT_H
@@ -50,6 +53,7 @@
 #include <stdexcept>
 #include <sstream>
 
+#include <boost/algorithm/string.hpp>
 #include <boost/program_options.hpp>
 #include <boost/filesystem.hpp>
 #include <thrift/stdcxx.h>
@@ -331,13 +335,18 @@
     }
   }
 
-  void testOneway(const int32_t sleepFor) {
-    printf("testOneway(%d): Sleeping...\n", sleepFor);
-    THRIFT_SLEEP_SEC(sleepFor);
-    printf("testOneway(%d): done sleeping!\n", sleepFor);
+  void testOneway(const int32_t aNum) {
+    printf("testOneway(%d): call received\n", aNum);
   }
 };
 
+class SecondHandler : public SecondServiceIf
+{
+  public:
+    void secondtestString(std::string& result, const std::string& thing)
+    { result = "testString(\"" + thing + "\")"; }
+};	
+
 class TestProcessorEventHandler : public TProcessorEventHandler {
   virtual void* getContext(const char* fn_name, void* serverContext) {
     (void)serverContext;
@@ -565,7 +574,7 @@
     ("abstract-namespace", "Create the domain socket in the Abstract Namespace (no connection with filesystem pathnames)")
     ("server-type", po::value<string>(&server_type)->default_value(server_type), "type of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\"")
     ("transport", po::value<string>(&transport_type)->default_value(transport_type), "transport: buffered, framed, http")
-    ("protocol", po::value<string>(&protocol_type)->default_value(protocol_type), "protocol: binary, compact, header, json")
+    ("protocol", po::value<string>(&protocol_type)->default_value(protocol_type), "protocol: binary, compact, header, json, multi, multic, multih, multij")
     ("ssl", "Encrypted Transport using SSL")
     ("processor-events", "processor-events")
     ("workers,n", po::value<size_t>(&workers)->default_value(workers), "Number of thread pools workers. Only valid for thread-pool server type")
@@ -597,6 +606,10 @@
       } else if (protocol_type == "compact") {
       } else if (protocol_type == "json") {
       } else if (protocol_type == "header") {
+      } else if (protocol_type == "multi") {	// multiplexed binary
+      } else if (protocol_type == "multic") {	// multiplexed compact
+      } else if (protocol_type == "multih") {	// multiplexed header
+      } else if (protocol_type == "multij") {	// multiplexed json
       } else {
         throw invalid_argument("Unknown protocol type " + protocol_type);
       }
@@ -627,15 +640,15 @@
 
   // Dispatcher
   stdcxx::shared_ptr<TProtocolFactory> protocolFactory;
-  if (protocol_type == "json") {
+  if (protocol_type == "json" || protocol_type == "multij") {
     stdcxx::shared_ptr<TProtocolFactory> jsonProtocolFactory(new TJSONProtocolFactory());
     protocolFactory = jsonProtocolFactory;
-  } else if (protocol_type == "compact") {
+  } else if (protocol_type == "compact" || protocol_type == "multic") {
     TCompactProtocolFactoryT<TBufferBase> *compactProtocolFactory = new TCompactProtocolFactoryT<TBufferBase>();
     compactProtocolFactory->setContainerSizeLimit(container_limit);
     compactProtocolFactory->setStringSizeLimit(string_limit);
     protocolFactory.reset(compactProtocolFactory);
-  } else if (protocol_type == "header") {
+  } else if (protocol_type == "header" || protocol_type == "multih") {
     stdcxx::shared_ptr<TProtocolFactory> headerProtocolFactory(new THeaderProtocolFactory());
     protocolFactory = headerProtocolFactory;
   } else {
@@ -645,9 +658,9 @@
     protocolFactory.reset(binaryProtocolFactory);
   }
 
-  // Processor
+  // Processors
   stdcxx::shared_ptr<TestHandler> testHandler(new TestHandler());
-  stdcxx::shared_ptr<ThriftTestProcessor> testProcessor(new ThriftTestProcessor(testHandler));
+  stdcxx::shared_ptr<TProcessor> testProcessor(new ThriftTestProcessor(testHandler));
 
   if (vm.count("processor-events")) {
     testProcessor->setEventHandler(
@@ -706,6 +719,18 @@
   }
   cout << endl;
 
+  // Multiplexed Processor if needed
+  if (boost::starts_with(protocol_type, "multi")) {
+    stdcxx::shared_ptr<SecondHandler> secondHandler(new SecondHandler());
+    stdcxx::shared_ptr<SecondServiceProcessor> secondProcessor(new SecondServiceProcessor(secondHandler));
+
+    stdcxx::shared_ptr<TMultiplexedProcessor> multiplexedProcessor(new TMultiplexedProcessor());
+    multiplexedProcessor->registerDefault(testProcessor); // non-multi clients go to the default processor (multi:binary, multic:compact, ...)
+    multiplexedProcessor->registerProcessor("ThriftTest", testProcessor);
+    multiplexedProcessor->registerProcessor("SecondService", secondProcessor);
+    testProcessor = stdcxx::dynamic_pointer_cast<TProcessor>(multiplexedProcessor);
+  }
+
   // Server
   stdcxx::shared_ptr<apache::thrift::server::TServer> server;