THRIFT-2013: add multiplex server and client test support to cpp language
add multiplex client test support to csharp and java languages
fix a bug in the server-side header protocol factory
fix a bug in the cpp SSL server socket implementation
remove unnecessary sleep in cpp server testOneway
This closes #1414
diff --git a/test/cpp/src/TestClient.cpp b/test/cpp/src/TestClient.cpp
index 6b2e731..6e7ff8e 100644
--- a/test/cpp/src/TestClient.cpp
+++ b/test/cpp/src/TestClient.cpp
@@ -26,6 +26,7 @@
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/protocol/THeaderProtocol.h>
#include <thrift/protocol/TJSONProtocol.h>
+#include <thrift/protocol/TMultiplexedProtocol.h>
#include <thrift/transport/THttpClient.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>
@@ -40,13 +41,15 @@
#include <inttypes.h>
#endif
-#include <boost/program_options.hpp>
+#include <boost/algorithm/string.hpp>
#include <boost/filesystem.hpp>
+#include <boost/program_options.hpp>
#include <thrift/stdcxx.h>
#if _WIN32
#include <thrift/windows/TWinsockSingleton.h>
#endif
+#include "SecondService.h"
#include "ThriftTest.h"
using namespace std;
@@ -156,28 +159,33 @@
int return_code = 0;
boost::program_options::options_description desc("Allowed options");
- desc.add_options()("help,h",
- "produce help message")("host",
- boost::program_options::value<string>(&host)
- ->default_value(host),
- "Host to connect")("port",
- boost::program_options::value<int>(
- &port)->default_value(port),
- "Port number to connect")(
- "domain-socket",
- boost::program_options::value<string>(&domain_socket)->default_value(domain_socket),
- "Domain Socket (e.g. /tmp/ThriftTest.thrift), instead of host and port")(
- "abstract-namespace",
- "Look for the domain socket in the Abstract Namespace (no connection with filesystem pathnames)")(
- "transport",
- boost::program_options::value<string>(&transport_type)->default_value(transport_type),
- "Transport: buffered, framed, http, evhttp")(
- "protocol",
- boost::program_options::value<string>(&protocol_type)->default_value(protocol_type),
- "Protocol: binary, header, compact, json")("ssl", "Encrypted Transport using SSL")(
- "testloops,n",
- boost::program_options::value<int>(&numTests)->default_value(numTests),
- "Number of Tests")("noinsane", "Do not run insanity test");
+ desc.add_options()
+ ("help,h", "produce help message")
+ ("host",
+ boost::program_options::value<string>(&host)->default_value(host),
+ "Host to connect")
+ ("port",
+ boost::program_options::value<int>(&port)->default_value(port),
+ "Port number to connect")
+ ("domain-socket",
+ boost::program_options::value<string>(&domain_socket)->default_value(domain_socket),
+ "Domain Socket (e.g. /tmp/ThriftTest.thrift), instead of host and port")
+ ("abstract-namespace",
+ "Look for the domain socket in the Abstract Namespace"
+ " (no connection with filesystem pathnames)")
+ ("transport",
+ boost::program_options::value<string>(&transport_type)->default_value(transport_type),
+ "Transport: buffered, framed, http, evhttp")
+ ("protocol",
+ boost::program_options::value<string>(&protocol_type)->default_value(protocol_type),
+ "Protocol: binary, compact, header, json, multi, multic, multih, multij")
+ ("ssl",
+ "Encrypted Transport using SSL")
+ ("testloops,n",
+ boost::program_options::value<int>(&numTests)->default_value(numTests),
+ "Number of Tests")
+ ("noinsane",
+ "Do not run insanity test");
boost::program_options::variables_map vm;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
@@ -194,6 +202,10 @@
} else if (protocol_type == "compact") {
} else if (protocol_type == "header") {
} else if (protocol_type == "json") {
+ } else if (protocol_type == "multi") {
+ } else if (protocol_type == "multic") {
+ } else if (protocol_type == "multih") {
+ } else if (protocol_type == "multij") {
} else {
throw invalid_argument("Unknown protocol type " + protocol_type);
}
@@ -232,6 +244,7 @@
stdcxx::shared_ptr<TSocket> socket;
stdcxx::shared_ptr<TTransport> transport;
stdcxx::shared_ptr<TProtocol> protocol;
+ stdcxx::shared_ptr<TProtocol> protocol2; // SecondService for multiplexed
if (ssl) {
cout << "Client Certificate File: " << certPath << endl;
@@ -271,18 +284,20 @@
transport = bufferedSocket;
}
- if (protocol_type.compare("json") == 0) {
- stdcxx::shared_ptr<TProtocol> jsonProtocol(new TJSONProtocol(transport));
- protocol = jsonProtocol;
- } else if (protocol_type.compare("compact") == 0) {
- stdcxx::shared_ptr<TProtocol> compactProtocol(new TCompactProtocol(transport));
- protocol = compactProtocol;
- } else if (protocol_type == "header") {
- stdcxx::shared_ptr<TProtocol> headerProtocol(new THeaderProtocol(transport));
- protocol = headerProtocol;
+ if (protocol_type == "json" || protocol_type == "multij") {
+ protocol = stdcxx::make_shared<TJSONProtocol>(transport);
+ } else if (protocol_type == "compact" || protocol_type == "multic") {
+ protocol = stdcxx::make_shared<TCompactProtocol>(transport);
+ } else if (protocol_type == "header" || protocol_type == "multih") {
+ protocol = stdcxx::make_shared<THeaderProtocol>(transport);
} else {
- stdcxx::shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol(transport));
- protocol = binaryProtocol;
+ protocol = stdcxx::make_shared<TBinaryProtocol>(transport);
+ }
+
+ if (boost::starts_with(protocol_type, "multi")) {
+ protocol2 = stdcxx::make_shared<TMultiplexedProtocol>(protocol, "SecondService");
+ // we don't need access to the original protocol any more, so...
+ protocol = stdcxx::make_shared<TMultiplexedProtocol>(protocol, "ThriftTest");
}
// Connection info
@@ -367,6 +382,25 @@
return_code |= ERR_BASETYPES;
}
+ //
+ // Multiplexed protocol - call another service method
+ // in the middle of the ThriftTest
+ //
+ if (boost::starts_with(protocol_type, "multi")) {
+ SecondServiceClient ssc(protocol2);
+ // transport is already open...
+
+ try {
+ cout << "secondService.secondTestString(\"foo\") => " << flush;
+ std::string result;
+ ssc.secondtestString(result, "foo");
+ cout << "{" << result << "}" << endl;
+ } catch (std::exception& e) {
+ cout << " *** FAILED *** " << e.what() << endl;
+ return_code |= ERR_EXCEPTIONS;
+ }
+ }
+
try {
#ifdef _MSC_VER
#pragma warning( push )
@@ -1096,12 +1130,14 @@
/**
* I32 TEST
*/
- cout << "re-test testI32(-1)";
+ cout << "re-test testI32(-1)" << flush;
int i32 = testClient.testI32(-1);
cout << " = " << i32 << endl;
if (i32 != -1)
return_code |= ERR_BASETYPES;
+ cout << endl << "All tests done." << endl << flush;
+
uint64_t stop = now();
uint64_t tot = stop - start;
@@ -1115,10 +1151,10 @@
time_max = tot;
}
+ cout << flush;
transport->close();
}
- cout << endl << "All tests done." << endl;
uint64_t time_avg = time_tot / numTests;
diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp
index 37d0eb6..744a86c 100644
--- a/test/cpp/src/TestServer.cpp
+++ b/test/cpp/src/TestServer.cpp
@@ -17,26 +17,29 @@
* under the License.
*/
-#include <thrift/concurrency/ThreadManager.h>
+#include <thrift/async/TAsyncBufferProcessor.h>
+#include <thrift/async/TAsyncProtocolProcessor.h>
+#include <thrift/async/TEvhttpServer.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadManager.h>
+#include <thrift/processor/TMultiplexedProcessor.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/protocol/THeaderProtocol.h>
#include <thrift/protocol/TJSONProtocol.h>
-#include <thrift/server/TSimpleServer.h>
-#include <thrift/server/TThreadedServer.h>
-#include <thrift/server/TThreadPoolServer.h>
-#include <thrift/async/TEvhttpServer.h>
-#include <thrift/async/TAsyncBufferProcessor.h>
-#include <thrift/async/TAsyncProtocolProcessor.h>
#include <thrift/server/TNonblockingServer.h>
-#include <thrift/transport/TServerSocket.h>
-#include <thrift/transport/TSSLServerSocket.h>
-#include <thrift/transport/TSSLSocket.h>
-#include <thrift/transport/TNonblockingServerSocket.h>
+#include <thrift/server/TSimpleServer.h>
+#include <thrift/server/TThreadPoolServer.h>
+#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/THttpServer.h>
#include <thrift/transport/THttpTransport.h>
+#include <thrift/transport/TNonblockingServerSocket.h>
+#include <thrift/transport/TSSLServerSocket.h>
+#include <thrift/transport/TSSLSocket.h>
+#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TTransportUtils.h>
+
+#include "SecondService.h"
#include "ThriftTest.h"
#ifdef HAVE_STDINT_H
@@ -50,6 +53,7 @@
#include <stdexcept>
#include <sstream>
+#include <boost/algorithm/string.hpp>
#include <boost/program_options.hpp>
#include <boost/filesystem.hpp>
#include <thrift/stdcxx.h>
@@ -331,13 +335,18 @@
}
}
- void testOneway(const int32_t sleepFor) {
- printf("testOneway(%d): Sleeping...\n", sleepFor);
- THRIFT_SLEEP_SEC(sleepFor);
- printf("testOneway(%d): done sleeping!\n", sleepFor);
+ void testOneway(const int32_t aNum) {
+ printf("testOneway(%d): call received\n", aNum);
}
};
+class SecondHandler : public SecondServiceIf
+{
+ public:
+ void secondtestString(std::string& result, const std::string& thing)
+ { result = "testString(\"" + thing + "\")"; }
+};
+
class TestProcessorEventHandler : public TProcessorEventHandler {
virtual void* getContext(const char* fn_name, void* serverContext) {
(void)serverContext;
@@ -565,7 +574,7 @@
("abstract-namespace", "Create the domain socket in the Abstract Namespace (no connection with filesystem pathnames)")
("server-type", po::value<string>(&server_type)->default_value(server_type), "type of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\"")
("transport", po::value<string>(&transport_type)->default_value(transport_type), "transport: buffered, framed, http")
- ("protocol", po::value<string>(&protocol_type)->default_value(protocol_type), "protocol: binary, compact, header, json")
+ ("protocol", po::value<string>(&protocol_type)->default_value(protocol_type), "protocol: binary, compact, header, json, multi, multic, multih, multij")
("ssl", "Encrypted Transport using SSL")
("processor-events", "processor-events")
("workers,n", po::value<size_t>(&workers)->default_value(workers), "Number of thread pools workers. Only valid for thread-pool server type")
@@ -597,6 +606,10 @@
} else if (protocol_type == "compact") {
} else if (protocol_type == "json") {
} else if (protocol_type == "header") {
+ } else if (protocol_type == "multi") { // multiplexed binary
+ } else if (protocol_type == "multic") { // multiplexed compact
+ } else if (protocol_type == "multih") { // multiplexed header
+ } else if (protocol_type == "multij") { // multiplexed json
} else {
throw invalid_argument("Unknown protocol type " + protocol_type);
}
@@ -627,15 +640,15 @@
// Dispatcher
stdcxx::shared_ptr<TProtocolFactory> protocolFactory;
- if (protocol_type == "json") {
+ if (protocol_type == "json" || protocol_type == "multij") {
stdcxx::shared_ptr<TProtocolFactory> jsonProtocolFactory(new TJSONProtocolFactory());
protocolFactory = jsonProtocolFactory;
- } else if (protocol_type == "compact") {
+ } else if (protocol_type == "compact" || protocol_type == "multic") {
TCompactProtocolFactoryT<TBufferBase> *compactProtocolFactory = new TCompactProtocolFactoryT<TBufferBase>();
compactProtocolFactory->setContainerSizeLimit(container_limit);
compactProtocolFactory->setStringSizeLimit(string_limit);
protocolFactory.reset(compactProtocolFactory);
- } else if (protocol_type == "header") {
+ } else if (protocol_type == "header" || protocol_type == "multih") {
stdcxx::shared_ptr<TProtocolFactory> headerProtocolFactory(new THeaderProtocolFactory());
protocolFactory = headerProtocolFactory;
} else {
@@ -645,9 +658,9 @@
protocolFactory.reset(binaryProtocolFactory);
}
- // Processor
+ // Processors
stdcxx::shared_ptr<TestHandler> testHandler(new TestHandler());
- stdcxx::shared_ptr<ThriftTestProcessor> testProcessor(new ThriftTestProcessor(testHandler));
+ stdcxx::shared_ptr<TProcessor> testProcessor(new ThriftTestProcessor(testHandler));
if (vm.count("processor-events")) {
testProcessor->setEventHandler(
@@ -706,6 +719,18 @@
}
cout << endl;
+ // Multiplexed Processor if needed
+ if (boost::starts_with(protocol_type, "multi")) {
+ stdcxx::shared_ptr<SecondHandler> secondHandler(new SecondHandler());
+ stdcxx::shared_ptr<SecondServiceProcessor> secondProcessor(new SecondServiceProcessor(secondHandler));
+
+ stdcxx::shared_ptr<TMultiplexedProcessor> multiplexedProcessor(new TMultiplexedProcessor());
+ multiplexedProcessor->registerDefault(testProcessor); // non-multi clients go to the default processor (multi:binary, multic:compact, ...)
+ multiplexedProcessor->registerProcessor("ThriftTest", testProcessor);
+ multiplexedProcessor->registerProcessor("SecondService", secondProcessor);
+ testProcessor = stdcxx::dynamic_pointer_cast<TProcessor>(multiplexedProcessor);
+ }
+
// Server
stdcxx::shared_ptr<apache::thrift::server::TServer> server;