THRIFT-2013: add multiplex server and client test support to cpp language
add multiplex client test support to csharp and java languages
fix a bug in the server-side header protocol factory
fix a bug in the cpp SSL server socket implementation
remove unnecessary sleep in cpp server testOneway
This closes #1414
diff --git a/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h b/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h
index 13b09bb..aa3d49f 100644
--- a/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h
+++ b/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h
@@ -27,8 +27,6 @@
namespace apache {
namespace thrift {
-using stdcxx::shared_ptr;
-
namespace protocol {
/**
@@ -38,7 +36,7 @@
*/
class StoredMessageProtocol : public TProtocolDecorator {
public:
- StoredMessageProtocol(shared_ptr<protocol::TProtocol> _protocol,
+ StoredMessageProtocol(stdcxx::shared_ptr<protocol::TProtocol> _protocol,
const std::string& _name,
const TMessageType _type,
const int32_t _seqid)
@@ -67,19 +65,19 @@
* processors with it, as shown in the following example:</p>
*
* <blockquote><code>
- * shared_ptr<TMultiplexedProcessor> processor(new TMultiplexedProcessor());
+ * stdcxx::shared_ptr<TMultiplexedProcessor> processor(new TMultiplexedProcessor());
*
* processor->registerProcessor(
* "Calculator",
- * shared_ptr<TProcessor>( new CalculatorProcessor(
- * shared_ptr<CalculatorHandler>( new CalculatorHandler()))));
+ * stdcxx::shared_ptr<TProcessor>( new CalculatorProcessor(
+ * stdcxx::shared_ptr<CalculatorHandler>( new CalculatorHandler()))));
*
* processor->registerProcessor(
* "WeatherReport",
- * shared_ptr<TProcessor>( new WeatherReportProcessor(
- * shared_ptr<WeatherReportHandler>( new WeatherReportHandler()))));
+ * stdcxx::shared_ptr<TProcessor>( new WeatherReportProcessor(
+ * stdcxx::shared_ptr<WeatherReportHandler>( new WeatherReportHandler()))));
*
- * shared_ptr<TServerTransport> transport(new TServerSocket(9090));
+ * stdcxx::shared_ptr<TServerTransport> transport(new TServerSocket(9090));
* TSimpleServer server(processor, transport);
*
* server.serve();
@@ -87,7 +85,7 @@
*/
class TMultiplexedProcessor : public TProcessor {
public:
- typedef std::map<std::string, shared_ptr<TProcessor> > services_t;
+ typedef std::map<std::string, stdcxx::shared_ptr<TProcessor> > services_t;
/**
* 'Register' a service with this <code>TMultiplexedProcessor</code>. This
@@ -100,11 +98,41 @@
* as "handlers", e.g. WeatherReportHandler,
* implementing WeatherReportIf interface.
*/
- void registerProcessor(const std::string& serviceName, shared_ptr<TProcessor> processor) {
+ void registerProcessor(const std::string& serviceName, stdcxx::shared_ptr<TProcessor> processor) {
services[serviceName] = processor;
}
/**
+ * Register a service to be called to process queries without service name
+ * \param [in] processor Implementation of a service.
+ */
+ void registerDefault(const stdcxx::shared_ptr<TProcessor>& processor) {
+ defaultProcessor = processor;
+ }
+
+ /**
+ * Chew up invalid input and return an exception to throw.
+ */
+ TException protocol_error(stdcxx::shared_ptr<protocol::TProtocol> in,
+ stdcxx::shared_ptr<protocol::TProtocol> out,
+ const std::string& name,
+ int32_t seqid,
+ const std::string& msg) const {
+ in->skip(::apache::thrift::protocol::T_STRUCT);
+ in->readMessageEnd();
+ in->getTransport()->readEnd();
+ ::apache::thrift::TApplicationException
+ x(::apache::thrift::TApplicationException::PROTOCOL_ERROR,
+ "TMultiplexedProcessor: " + msg);
+ out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
+ x.write(out.get());
+ out->writeMessageEnd();
+ out->getTransport()->writeEnd();
+ out->getTransport()->flush();
+ return TException(msg);
+}
+
+ /**
* This implementation of <code>process</code> performs the following steps:
*
* <ol>
@@ -119,8 +147,8 @@
* the service name was not found in the message, or if the service
* name was not found in the service map.
*/
- bool process(shared_ptr<protocol::TProtocol> in,
- shared_ptr<protocol::TProtocol> out,
+ bool process(stdcxx::shared_ptr<protocol::TProtocol> in,
+ stdcxx::shared_ptr<protocol::TProtocol> out,
void* connectionContext) {
std::string name;
protocol::TMessageType type;
@@ -133,22 +161,10 @@
if (type != protocol::T_CALL && type != protocol::T_ONEWAY) {
// Unexpected message type.
- in->skip(::apache::thrift::protocol::T_STRUCT);
- in->readMessageEnd();
- in->getTransport()->readEnd();
- const std::string msg("TMultiplexedProcessor: Unexpected message type");
- ::apache::thrift::TApplicationException
- x(::apache::thrift::TApplicationException::PROTOCOL_ERROR, msg);
- out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
- x.write(out.get());
- out->writeMessageEnd();
- out->getTransport()->writeEnd();
- out->getTransport()->flush();
- throw TException(msg);
+ throw protocol_error(in, out, name, seqid, "Unexpected message type");
}
// Extract the service name
-
boost::tokenizer<boost::char_separator<char> > tok(name, boost::char_separator<char>(":"));
std::vector<std::string> tokens;
@@ -161,39 +177,46 @@
services_t::iterator it = services.find(tokens[0]);
if (it != services.end()) {
- shared_ptr<TProcessor> processor = it->second;
+ stdcxx::shared_ptr<TProcessor> processor = it->second;
// Let the processor registered for this service name
// process the message.
return processor
- ->process(shared_ptr<protocol::TProtocol>(
+ ->process(stdcxx::shared_ptr<protocol::TProtocol>(
new protocol::StoredMessageProtocol(in, tokens[1], type, seqid)),
out,
connectionContext);
} else {
// Unknown service.
- in->skip(::apache::thrift::protocol::T_STRUCT);
- in->readMessageEnd();
- in->getTransport()->readEnd();
-
- std::string msg("TMultiplexedProcessor: Unknown service: ");
- msg += tokens[0];
- ::apache::thrift::TApplicationException
- x(::apache::thrift::TApplicationException::PROTOCOL_ERROR, msg);
- out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
- x.write(out.get());
- out->writeMessageEnd();
- out->getTransport()->writeEnd();
- out->getTransport()->flush();
- msg += ". Did you forget to call registerProcessor()?";
- throw TException(msg);
+ throw protocol_error(in, out, name, seqid,
+ "Unknown service: " + tokens[0] +
+ ". Did you forget to call registerProcessor()?");
}
+ } else if (tokens.size() == 1) {
+ if (defaultProcessor) {
+ // non-multiplexed client forwards to default processor
+ return defaultProcessor
+ ->process(stdcxx::shared_ptr<protocol::TProtocol>(
+ new protocol::StoredMessageProtocol(in, tokens[0], type, seqid)),
+ out,
+ connectionContext);
+ } else {
+ throw protocol_error(in, out, name, seqid,
+ "Non-multiplexed client request dropped. "
+ "Did you forget to call defaultProcessor()?");
+ }
+ } else {
+ throw protocol_error(in, out, name, seqid,
+ "Wrong number of tokens.");
}
- return false;
}
private:
/** Map of service processor objects, indexed by service names. */
services_t services;
+
+ //! If a non-multi client requests something, it goes to the
+ //! default processor (if one is defined) for backwards compatibility.
+ stdcxx::shared_ptr<TProcessor> defaultProcessor;
};
}
}
diff --git a/lib/cpp/src/thrift/protocol/THeaderProtocol.h b/lib/cpp/src/thrift/protocol/THeaderProtocol.h
index 0b3997c..8cd5017 100644
--- a/lib/cpp/src/thrift/protocol/THeaderProtocol.h
+++ b/lib/cpp/src/thrift/protocol/THeaderProtocol.h
@@ -192,7 +192,7 @@
public:
virtual stdcxx::shared_ptr<TProtocol> getProtocol(stdcxx::shared_ptr<transport::TTransport> trans) {
THeaderProtocol* headerProtocol
- = new THeaderProtocol(trans, stdcxx::shared_ptr<transport::TTransport>(), T_BINARY_PROTOCOL);
+ = new THeaderProtocol(trans, trans, T_BINARY_PROTOCOL);
return stdcxx::shared_ptr<TProtocol>(headerProtocol);
}
diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.cpp b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
index ddefb34..3f0e28e 100644
--- a/lib/cpp/src/thrift/transport/TSSLSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
@@ -157,7 +157,7 @@
mutexes.reset();
}
-static void buildErrors(string& message, int error = 0);
+static void buildErrors(string& message, int errno_copy = 0, int sslerrno = 0);
static bool matchName(const char* host, const char* pattern, int size);
static char uppercase(char c);
@@ -301,7 +301,7 @@
default:;// do nothing
}
string errors;
- buildErrors(errors, errno_copy);
+ buildErrors(errors, errno_copy, error);
throw TSSLException("SSL_peek: " + errors);
} else if (rc == 0) {
ERR_clear_error();
@@ -325,12 +325,14 @@
if (ssl_ != NULL) {
try {
int rc;
+ int errno_copy = 0;
+ int error = 0;
do {
rc = SSL_shutdown(ssl_);
if (rc <= 0) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
- int error = SSL_get_error(ssl_, rc);
+ errno_copy = THRIFT_GET_SOCKET_ERROR;
+ error = SSL_get_error(ssl_, rc);
switch (error) {
case SSL_ERROR_SYSCALL:
if ((errno_copy != THRIFT_EINTR)
@@ -348,9 +350,8 @@
} while (rc == 2);
if (rc < 0) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
string errors;
- buildErrors(errors, errno_copy);
+ buildErrors(errors, errno_copy, error);
GlobalOutput(("SSL_shutdown: " + errors).c_str());
}
} catch (TTransportException& te) {
@@ -380,17 +381,19 @@
throw TTransportException(TTransportException::UNKNOWN, "retry again");
int32_t bytes = 0;
while (readRetryCount_ < maxRecvRetries_) {
- ERR_clear_error();
bytes = SSL_read(ssl_, buf, len);
+ int32_t errno_copy = THRIFT_GET_SOCKET_ERROR;
int32_t error = SSL_get_error(ssl_, bytes);
readRetryCount_++;
- if (bytes >= 0 && error == 0) {
+ if (error == SSL_ERROR_NONE) {
readRetryCount_ = 0;
break;
}
- int32_t errno_copy = THRIFT_GET_SOCKET_ERROR;
unsigned int waitEventReturn;
switch (error) {
+ case SSL_ERROR_ZERO_RETURN:
+ throw TTransportException(TTransportException::END_OF_FILE, "client disconnected");
+
case SSL_ERROR_SYSCALL:
if ((errno_copy != THRIFT_EINTR)
&& (errno_copy != THRIFT_EAGAIN)) {
@@ -422,9 +425,9 @@
throw TTransportException(TTransportException::INTERNAL_ERROR, "too much recv retries");
}
else if (waitEventReturn == TSSL_DATA) {
- // in case of SSL and huge thrift packets, there may be a number of
- // socket operations, before any data becomes available by SSL_read().
- // Therefore the number of retries should not be increased and
+ // in case of SSL and huge thrift packets, there may be a number of
+ // socket operations, before any data becomes available by SSL_read().
+ // Therefore the number of retries should not be increased and
// the operation should be repeated.
readRetryCount_--;
continue;
@@ -433,7 +436,7 @@
default:;// do nothing
}
string errors;
- buildErrors(errors, errno_copy);
+ buildErrors(errors, errno_copy, error);
throw TSSLException("SSL_read: " + errors);
}
return bytes;
@@ -470,7 +473,7 @@
default:;// do nothing
}
string errors;
- buildErrors(errors, errno_copy);
+ buildErrors(errors, errno_copy, error);
throw TSSLException("SSL_write: " + errors);
}
written += bytes;
@@ -514,7 +517,7 @@
default:;// do nothing
}
string errors;
- buildErrors(errors, errno_copy);
+ buildErrors(errors, errno_copy, error);
throw TSSLException("SSL_write: " + errors);
}
written += bytes;
@@ -574,12 +577,14 @@
}
int rc;
+ int errno_copy = 0;
+ int error = 0;
if (server()) {
do {
rc = SSL_accept(ssl_);
if (rc <= 0) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
- int error = SSL_get_error(ssl_, rc);
+ errno_copy = THRIFT_GET_SOCKET_ERROR;
+ error = SSL_get_error(ssl_, rc);
switch (error) {
case SSL_ERROR_SYSCALL:
if ((errno_copy != THRIFT_EINTR)
@@ -610,8 +615,8 @@
do {
rc = SSL_connect(ssl_);
if (rc <= 0) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
- int error = SSL_get_error(ssl_, rc);
+ errno_copy = THRIFT_GET_SOCKET_ERROR;
+ error = SSL_get_error(ssl_, rc);
switch (error) {
case SSL_ERROR_SYSCALL:
if ((errno_copy != THRIFT_EINTR)
@@ -635,10 +640,9 @@
} while (rc == 2);
}
if (rc <= 0) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
string fname(server() ? "SSL_accept" : "SSL_connect");
string errors;
- buildErrors(errors, errno_copy);
+ buildErrors(errors, errno_copy, error);
throw TSSLException(fname + ": " + errors);
}
authorize();
@@ -975,7 +979,7 @@
}
// extract error messages from error queue
-void buildErrors(string& errors, int errno_copy) {
+void buildErrors(string& errors, int errno_copy, int sslerrno) {
unsigned long errorCode;
char message[256];
@@ -999,6 +1003,9 @@
if (errors.empty()) {
errors = "error code: " + to_string(errno_copy);
}
+ if (sslerrno) {
+ errors += " (SSL_error_code = " + to_string(sslerrno) + ")";
+ }
}
/**
diff --git a/lib/csharp/test/ThriftMVCTest/Controllers/HomeController.cs b/lib/csharp/test/ThriftMVCTest/Controllers/HomeController.cs
index ab9eada..c9a1ec4 100644
--- a/lib/csharp/test/ThriftMVCTest/Controllers/HomeController.cs
+++ b/lib/csharp/test/ThriftMVCTest/Controllers/HomeController.cs
@@ -41,9 +41,8 @@
SecondService.IAsync asyncService =
new SecondService.Client(new TBinaryProtocol(new THttpClient(new Uri(baseUri, "Async.thrift"))));
- await asyncService.blahBlahAsync();
var result = await asyncService.secondtestStringAsync("TestString");
- if (result != "TestString")
+ if (result != "testString(\"TestString\")")
{
throw new Exception("The wrong result was returned");
}
@@ -59,9 +58,8 @@
SecondService.ISync service =
new SecondService.Client(new TBinaryProtocol(new THttpClient(new Uri(baseUri, "Sync.thrift"))));
- service.blahBlah();
var result = service.secondtestString("TestString");
- if (result != "TestString")
+ if (result != "testString(\"TestString\")")
{
throw new Exception("The wrong result was returned");
}
@@ -69,4 +67,4 @@
return RedirectToAction("Index");
}
}
-}
\ No newline at end of file
+}
diff --git a/lib/csharp/test/ThriftMVCTest/SecondServiceImpl.cs b/lib/csharp/test/ThriftMVCTest/SecondServiceImpl.cs
index dce0148..fad301a 100644
--- a/lib/csharp/test/ThriftMVCTest/SecondServiceImpl.cs
+++ b/lib/csharp/test/ThriftMVCTest/SecondServiceImpl.cs
@@ -24,24 +24,14 @@
{
public class SecondServiceImpl : SecondService.IAsync, SecondService.ISync
{
- public Task blahBlahAsync()
- {
- return Task.FromResult(0);
- }
-
public Task<string> secondtestStringAsync(string thing)
{
return Task.FromResult(thing);
}
- public void blahBlah()
- {
-
- }
-
public string secondtestString(string thing)
{
- return thing;
+ return "testString(\"" + thing + "\")";
}
}
-}
\ No newline at end of file
+}
diff --git a/lib/erl/test/thrift_test_test.erl b/lib/erl/test/thrift_test_test.erl
index ae20f31..77df61d 100644
--- a/lib/erl/test/thrift_test_test.erl
+++ b/lib/erl/test/thrift_test_test.erl
@@ -628,18 +628,6 @@
{struct, []},
thrift_test_thrift:function_info(testOneway, exceptions)
)},
- {"blahBlah params", ?_assertEqual(
- {struct, []},
- second_service_thrift:function_info(blahBlah, params_type)
- )},
- {"blahBlah reply", ?_assertEqual(
- {struct, []},
- second_service_thrift:function_info(blahBlah, reply_type)
- )},
- {"blahBlah exceptions", ?_assertEqual(
- {struct, []},
- second_service_thrift:function_info(blahBlah, exceptions)
- )},
{"secondtestString params", ?_assertEqual(
{struct, [{1, string}]},
second_service_thrift:function_info(secondtestString, params_type)
diff --git a/lib/java/test/org/apache/thrift/test/TestClient.java b/lib/java/test/org/apache/thrift/test/TestClient.java
index b2ce1e6..feaa972 100644
--- a/lib/java/test/org/apache/thrift/test/TestClient.java
+++ b/lib/java/test/org/apache/thrift/test/TestClient.java
@@ -33,6 +33,7 @@
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TSimpleJSONProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
@@ -46,6 +47,7 @@
// Generated code
import thrift.test.Insanity;
import thrift.test.Numberz;
+import thrift.test.SecondService;
import thrift.test.ThriftTest;
import thrift.test.Xception;
import thrift.test.Xception2;
@@ -64,6 +66,7 @@
private static int ERR_STRUCTS = 2;
private static int ERR_CONTAINERS = 4;
private static int ERR_EXCEPTIONS = 8;
+ private static int ERR_PROTOCOLS = 16;
private static int ERR_UNKNOWN = 64;
public static void main(String [] args) {
@@ -102,7 +105,7 @@
System.out.println(" --host=arg (=" + host + ")\tHost to connect");
System.out.println(" --port=arg (=" + port + ")\tPort number to connect");
System.out.println(" --transport=arg (=" + transport_type + ")\n\t\t\t\tTransport: buffered, framed, fastframed, http");
- System.out.println(" --protocol=arg (=" + protocol_type + ")\tProtocol: binary, json, compact");
+ System.out.println(" --protocol=arg (=" + protocol_type + ")\tProtocol: binary, compact, json, multi, multic, multij");
System.out.println(" --ssl\t\t\tEncrypted Transport using SSL");
System.out.println(" --testloops[--n]=arg (=" + numTests + ")\tNumber of Tests");
System.exit(0);
@@ -117,6 +120,9 @@
if (protocol_type.equals("binary")) {
} else if (protocol_type.equals("compact")) {
} else if (protocol_type.equals("json")) {
+ } else if (protocol_type.equals("multi")) {
+ } else if (protocol_type.equals("multic")) {
+ } else if (protocol_type.equals("multij")) {
} else {
throw new Exception("Unknown protocol type! " + protocol_type);
}
@@ -163,16 +169,21 @@
}
TProtocol tProtocol = null;
- if (protocol_type.equals("json")) {
+ TProtocol tProtocol2 = null;
+ if (protocol_type.equals("json") || protocol_type.equals("multij")) {
tProtocol = new TJSONProtocol(transport);
- } else if (protocol_type.equals("compact")) {
+ } else if (protocol_type.equals("compact") || protocol_type.equals("multic")) {
tProtocol = new TCompactProtocol(transport);
} else {
tProtocol = new TBinaryProtocol(transport);
}
- ThriftTest.Client testClient =
- new ThriftTest.Client(tProtocol);
+ if (protocol_type.startsWith("multi")) {
+ tProtocol2 = new TMultiplexedProtocol(tProtocol, "SecondService");
+ tProtocol = new TMultiplexedProtocol(tProtocol, "ThriftTest");
+ }
+
+ ThriftTest.Client testClient = new ThriftTest.Client(tProtocol);
Insanity insane = new Insanity();
long timeMin = 0;
@@ -223,6 +234,19 @@
}
/**
+ * Multiplexed test
+ */
+ if (protocol_type.startsWith("multi")) {
+ SecondService.Client secondClient = new SecondService.Client(tProtocol2);
+ System.out.print("secondtestString(\"Test2\")");
+ s = secondClient.secondtestString("Test2");
+ System.out.print(" = \"" + s + "\"\n");
+ if (!s.equals("testString(\"Test2\")")) {
+ returnCode |= ERR_PROTOCOLS;
+ System.out.println("*** FAILURE ***\n");
+ }
+ }
+ /**
* BYTE TEST
*/
System.out.print("testByte(1)");
diff --git a/lib/java/test/org/apache/thrift/test/TestServer.java b/lib/java/test/org/apache/thrift/test/TestServer.java
index 71b86c3..1f3e555 100644
--- a/lib/java/test/org/apache/thrift/test/TestServer.java
+++ b/lib/java/test/org/apache/thrift/test/TestServer.java
@@ -73,10 +73,6 @@
static class SecondHandler implements thrift.test.SecondService.Iface {
@Override
- public void blahBlah() throws org.apache.thrift.TException
- { throw new org.apache.thrift.TException("blahBlah"); }
-
- @Override
public java.lang.String secondtestString(java.lang.String thing) throws org.apache.thrift.TException
{ return "testString(\"" + thing + "\")"; }
diff --git a/lib/nodejs/test/client.js b/lib/nodejs/test/client.js
index 9609518..006fad2 100644
--- a/lib/nodejs/test/client.js
+++ b/lib/nodejs/test/client.js
@@ -108,7 +108,7 @@
connection.on('connect', function() {
secondclient.secondtestString("Test", function(err, response) {
assert(!err);
- assert.equal("Test", response);
+ assert.equal("testString(\"Test\")", response);
});
runTests();
diff --git a/lib/nodejs/test/server.js b/lib/nodejs/test/server.js
index bad3b17..8f2e06b 100644
--- a/lib/nodejs/test/server.js
+++ b/lib/nodejs/test/server.js
@@ -72,8 +72,8 @@
if (type === 'multiplex') {
var SecondServiceHandler = {
secondtestString: function(thing, result) {
- console.log('testString(\'' + thing + '\')');
- result(null, thing);
+ console.log('testString("' + thing + '")');
+ result(null, 'testString("' + thing + '")');
}
};