THRIFT-3877: fix py/py3 server, java client with http transport
The java TestClient asks the server to runa oneway request that
sleeps for 3 seconds. If the java TestClient sees the duration
of the call exceed one second, it fails the test. This means the
server did not participate in the "fire and forget" dynamics of
ONEWAY requests. In this case the THttpServer was processing the
RPC before sending the transport response. The fix was to enhance
the TProcessor so that the THttpServer has an opportunity to inspect
the message header before processing the RPC.
This is partly due to the violation of the THttpServer in the
layered architecture. It is essentially implementing a combined
server and transport, whereas there should be a distinct server,
protocol, and transport separation. Many languages seem to have
this problem where HTTP was introduced.
diff --git a/compiler/cpp/src/thrift/generate/t_py_generator.cc b/compiler/cpp/src/thrift/generate/t_py_generator.cc
index c16d6d3..83462f4 100644
--- a/compiler/cpp/src/thrift/generate/t_py_generator.cc
+++ b/compiler/cpp/src/thrift/generate/t_py_generator.cc
@@ -1831,6 +1831,13 @@
f_service_ << indent() << "self._processMap[\"" << (*f_iter)->get_name()
<< "\"] = Processor.process_" << (*f_iter)->get_name() << endl;
}
+ f_service_ << indent() << "self._on_message_begin = None" << endl;
+ indent_down();
+ f_service_ << endl;
+
+ f_service_ << indent() << "def on_message_begin(self, func):" << endl;
+ indent_up();
+ f_service_ << indent() << "self._on_message_begin = func" << endl;
indent_down();
f_service_ << endl;
@@ -1839,6 +1846,10 @@
indent_up();
f_service_ << indent() << "(name, type, seqid) = iprot.readMessageBegin()" << endl;
+ f_service_ << indent() << "if self._on_message_begin:" << endl;
+ indent_up();
+ f_service_ << indent() << "self._on_message_begin(name, type, seqid)" << endl;
+ indent_down();
// TODO(mcslee): validate message
diff --git a/lib/java/test/org/apache/thrift/test/TestClient.java b/lib/java/test/org/apache/thrift/test/TestClient.java
index feaa972..84410ce 100644
--- a/lib/java/test/org/apache/thrift/test/TestClient.java
+++ b/lib/java/test/org/apache/thrift/test/TestClient.java
@@ -752,13 +752,18 @@
testClient.testOneway(3);
long onewayElapsedMillis = (System.nanoTime() - startOneway) / 1000000;
if (onewayElapsedMillis > 200) {
- System.out.println("Oneway test failed: took " +
+ System.out.println("Oneway test took too long to execute failed: took " +
Long.toString(onewayElapsedMillis) +
"ms");
- System.out.printf("*** FAILURE ***\n");
+ System.out.println("oneway calls are 'fire and forget' and therefore should not cause blocking.");
+ System.out.println("Some transports (HTTP) have a required response, and typically this failure");
+ System.out.println("means the transport response was delayed until after the execution");
+ System.out.println("of the RPC. The server should post the transport response immediately and");
+ System.out.println("before executing the RPC.");
+ System.out.println("*** FAILURE ***");
returnCode |= ERR_BASETYPES;
} else {
- System.out.println("Success - took " +
+ System.out.println("Success - fire and forget only took " +
Long.toString(onewayElapsedMillis) +
"ms");
}
diff --git a/lib/py/src/TMultiplexedProcessor.py b/lib/py/src/TMultiplexedProcessor.py
index bd10d9b..ff88430 100644
--- a/lib/py/src/TMultiplexedProcessor.py
+++ b/lib/py/src/TMultiplexedProcessor.py
@@ -39,6 +39,10 @@
def registerProcessor(self, serviceName, processor):
self.services[serviceName] = processor
+ def on_message_begin(self, func):
+ for key in self.services.keys():
+ self.services[key].on_message_begin(func)
+
def process(self, iprot, oprot):
(name, type, seqid) = iprot.readMessageBegin()
if type != TMessageType.CALL and type != TMessageType.ONEWAY:
diff --git a/lib/py/src/Thrift.py b/lib/py/src/Thrift.py
index 00941d8..c390cbb 100644
--- a/lib/py/src/Thrift.py
+++ b/lib/py/src/Thrift.py
@@ -72,6 +72,18 @@
"""Base class for processor, which works on two streams."""
def process(self, iprot, oprot):
+ """
+ Process a request. The normal behvaior is to have the
+ processor invoke the correct handler and then it is the
+ server's responsibility to write the response to oprot.
+ """
+ pass
+
+ def on_message_begin(self, func):
+ """
+ Install a callback that receives (name, type, seqid)
+ after the message header is read.
+ """
pass
diff --git a/lib/py/src/server/THttpServer.py b/lib/py/src/server/THttpServer.py
index 85cf400..47e817d 100644
--- a/lib/py/src/server/THttpServer.py
+++ b/lib/py/src/server/THttpServer.py
@@ -21,6 +21,7 @@
from six.moves import BaseHTTPServer
+from thrift.Thrift import TMessageType
from thrift.server import TServer
from thrift.transport import TTransport
@@ -32,7 +33,9 @@
to override this behavior (e.g., to simulate a misconfigured or
overloaded web server during testing), it can raise a ResponseException.
The function passed to the constructor will be called with the
- RequestHandler as its only argument.
+ RequestHandler as its only argument. Note that this is irrelevant
+ for ONEWAY requests, as the HTTP response must be sent before the
+ RPC is processed.
"""
def __init__(self, handler):
self.handler = handler
@@ -43,6 +46,9 @@
This class is not very performant, but it is useful (for example) for
acting as a mock version of an Apache-based PHP Thrift endpoint.
+ Also important to note the HTTP implementation pretty much violates the
+ transport/protocol/processor/server layering, by performing the transport
+ functions here. This means things like oneway handling are oddly exposed.
"""
def __init__(self,
processor,
@@ -68,26 +74,45 @@
inputProtocolFactory, outputProtocolFactory)
thttpserver = self
+ self._replied = None
class RequestHander(BaseHTTPServer.BaseHTTPRequestHandler):
def do_POST(self):
# Don't care about the request path.
- itrans = TTransport.TFileObjectTransport(self.rfile)
- otrans = TTransport.TFileObjectTransport(self.wfile)
+ thttpserver._replied = False
+ iftrans = TTransport.TFileObjectTransport(self.rfile)
itrans = TTransport.TBufferedTransport(
- itrans, int(self.headers['Content-Length']))
+ iftrans, int(self.headers['Content-Length']))
otrans = TTransport.TMemoryBuffer()
iprot = thttpserver.inputProtocolFactory.getProtocol(itrans)
oprot = thttpserver.outputProtocolFactory.getProtocol(otrans)
try:
+ thttpserver.processor.on_message_begin(self.on_begin)
thttpserver.processor.process(iprot, oprot)
except ResponseException as exn:
exn.handler(self)
else:
+ if not thttpserver._replied:
+ # If the request was ONEWAY we would have replied already
+ data = otrans.getvalue()
+ self.send_response(200)
+ self.send_header("Content-Length", len(data))
+ self.send_header("Content-Type", "application/x-thrift")
+ self.end_headers()
+ self.wfile.write(data)
+
+ def on_begin(self, name, type, seqid):
+ """
+ Inspect the message header.
+
+ This allows us to post an immediate transport response
+ if the request is a ONEWAY message type.
+ """
+ if type == TMessageType.ONEWAY:
self.send_response(200)
- self.send_header("content-type", "application/x-thrift")
+ self.send_header("Content-Type", "application/x-thrift")
self.end_headers()
- self.wfile.write(otrans.getvalue())
+ thttpserver._replied = True
self.httpd = server_class(server_address, RequestHander)
diff --git a/test/known_failures_Linux.json b/test/known_failures_Linux.json
index dd7fb6b..5beaa58 100644
--- a/test/known_failures_Linux.json
+++ b/test/known_failures_Linux.json
@@ -452,35 +452,20 @@
"py-hs_compact_http-ip",
"py-hs_header_http-ip",
"py-hs_json_http-ip",
- "py-java_accel-binary_http-ip",
"py-java_accel-binary_http-ip-ssl",
- "py-java_accelc-compact_http-ip",
"py-java_accelc-compact_http-ip-ssl",
- "py-java_binary_http-ip",
"py-java_binary_http-ip-ssl",
- "py-java_compact_http-ip",
"py-java_compact_http-ip-ssl",
- "py-java_json_http-ip",
"py-java_json_http-ip-ssl",
- "py-java_multi-binary_http-ip",
"py-java_multi-binary_http-ip-ssl",
- "py-java_multi_http-ip",
"py-java_multi_http-ip-ssl",
- "py-java_multia-binary_http-ip",
"py-java_multia-binary_http-ip-ssl",
- "py-java_multia-multi_http-ip",
"py-java_multia-multi_http-ip-ssl",
- "py-java_multiac-compact_http-ip",
"py-java_multiac-compact_http-ip-ssl",
- "py-java_multiac-multic_http-ip",
"py-java_multiac-multic_http-ip-ssl",
- "py-java_multic-compact_http-ip",
"py-java_multic-compact_http-ip-ssl",
- "py-java_multic_http-ip",
"py-java_multic_http-ip-ssl",
- "py-java_multij-json_http-ip",
"py-java_multij-json_http-ip-ssl",
- "py-java_multij_http-ip",
"py-java_multij_http-ip-ssl",
"py-lua_accel-binary_http-ip",
"py-lua_accelc-compact_http-ip",
@@ -564,35 +549,20 @@
"py3-hs_compact_http-ip",
"py3-hs_header_http-ip",
"py3-hs_json_http-ip",
- "py3-java_accel-binary_http-ip",
"py3-java_accel-binary_http-ip-ssl",
- "py3-java_accelc-compact_http-ip",
"py3-java_accelc-compact_http-ip-ssl",
- "py3-java_binary_http-ip",
"py3-java_binary_http-ip-ssl",
- "py3-java_compact_http-ip",
"py3-java_compact_http-ip-ssl",
- "py3-java_json_http-ip",
"py3-java_json_http-ip-ssl",
- "py3-java_multi-binary_http-ip",
"py3-java_multi-binary_http-ip-ssl",
- "py3-java_multi_http-ip",
"py3-java_multi_http-ip-ssl",
- "py3-java_multia-binary_http-ip",
"py3-java_multia-binary_http-ip-ssl",
- "py3-java_multia-multi_http-ip",
"py3-java_multia-multi_http-ip-ssl",
- "py3-java_multiac-compact_http-ip",
"py3-java_multiac-compact_http-ip-ssl",
- "py3-java_multiac-multic_http-ip",
"py3-java_multiac-multic_http-ip-ssl",
- "py3-java_multic-compact_http-ip",
"py3-java_multic-compact_http-ip-ssl",
- "py3-java_multic_http-ip",
"py3-java_multic_http-ip-ssl",
- "py3-java_multij-json_http-ip",
"py3-java_multij-json_http-ip-ssl",
- "py3-java_multij_http-ip",
"py3-java_multij_http-ip-ssl",
"py3-lua_accel-binary_http-ip",
"py3-lua_accelc-compact_http-ip",
@@ -613,4 +583,4 @@
"rb-cpp_json_framed-domain",
"rb-cpp_json_framed-ip",
"rb-cpp_json_framed-ip-ssl"
-]
\ No newline at end of file
+]