THRIFT-3877: cpp http server buffering bug oneway
Client: C++
This closes #1418
C++ HTTP server, hit with oneway RPC, then roundtrip RPC, no longer
hangs, as demonstrated by OneWayHTTPTest.
Unit-test: Hit a C++ HTTP server with a oneway rpc, and the next RPC
will hang. This test-case elicits the failure (converts to
timeout-expiry).
diff --git a/lib/cpp/test/OneWayHTTPTest.cpp b/lib/cpp/test/OneWayHTTPTest.cpp
new file mode 100644
index 0000000..3fe63b6
--- /dev/null
+++ b/lib/cpp/test/OneWayHTTPTest.cpp
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <boost/test/auto_unit_test.hpp>
+#include <boost/thread.hpp>
+#include <iostream>
+#include <climits>
+#include <vector>
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/protocol/TBinaryProtocol.h>
+#include <thrift/protocol/TJSONProtocol.h>
+#include <thrift/server/TThreadedServer.h>
+#include <thrift/transport/THttpServer.h>
+#include <thrift/transport/THttpClient.h>
+#include <thrift/transport/TServerSocket.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/stdcxx.h>
+#include <thrift/transport/TBufferTransports.h>
+#include "gen-cpp/OneWayService.h"
+
+BOOST_AUTO_TEST_SUITE(OneWayHTTPTest)
+
+using namespace apache::thrift;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::protocol::TBinaryProtocol;
+using apache::thrift::protocol::TBinaryProtocolFactory;
+using apache::thrift::protocol::TJSONProtocol;
+using apache::thrift::protocol::TJSONProtocolFactory;
+using apache::thrift::server::TThreadedServer;
+using apache::thrift::server::TServerEventHandler;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::THttpServer;
+using apache::thrift::transport::THttpServerTransportFactory;
+using apache::thrift::transport::THttpClient;
+using apache::thrift::transport::TBufferedTransport;
+using apache::thrift::transport::TBufferedTransportFactory;
+using apache::thrift::transport::TMemoryBuffer;
+using apache::thrift::transport::TServerSocket;
+using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransportException;
+using apache::thrift::stdcxx::shared_ptr;
+using std::cout;
+using std::cerr;
+using std::endl;
+using std::string;
+namespace utf = boost::unit_test;
+
+// Define this env var to enable some logging (in case you need to debug)
+#undef ENABLE_STDERR_LOGGING
+
+class OneWayServiceHandler : public onewaytest::OneWayServiceIf {
+public:
+ OneWayServiceHandler() {}
+
+ void roundTripRPC() override {
+#ifdef ENABLE_STDERR_LOGGING
+ cerr << "roundTripRPC()" << endl;
+#endif
+ }
+ void oneWayRPC() {
+#ifdef ENABLE_STDERR_LOGGING
+ cerr << "oneWayRPC()" << std::endl ;
+#endif
+ }
+};
+
+class OneWayServiceCloneFactory : virtual public onewaytest::OneWayServiceIfFactory {
+ public:
+ virtual ~OneWayServiceCloneFactory() {}
+ virtual onewaytest::OneWayServiceIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo)
+ {
+ (void)connInfo ;
+ return new OneWayServiceHandler;
+ }
+ virtual void releaseHandler( onewaytest::OneWayServiceIf* handler) {
+ delete handler;
+ }
+};
+
+class RPC0ThreadClass {
+public:
+ RPC0ThreadClass(TThreadedServer& server) : server_(server) { } // Constructor
+~RPC0ThreadClass() { } // Destructor
+
+void Run() {
+ server_.serve() ;
+}
+ TThreadedServer& server_ ;
+} ;
+
+using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Synchronized;
+
+// copied from IntegrationTest
+class TServerReadyEventHandler : public TServerEventHandler, public Monitor {
+public:
+ TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
+ virtual ~TServerReadyEventHandler() {}
+ virtual void preServe() {
+ Synchronized sync(*this);
+ isListening_ = true;
+ notify();
+ }
+ virtual void* createContext(shared_ptr<TProtocol> input,
+ shared_ptr<TProtocol> output) {
+ Synchronized sync(*this);
+ ++accepted_;
+ notify();
+
+ (void)input;
+ (void)output;
+ return NULL;
+ }
+ bool isListening() const { return isListening_; }
+ uint64_t acceptedCount() const { return accepted_; }
+
+private:
+ bool isListening_;
+ uint64_t accepted_;
+};
+
+class TBlockableBufferedTransport : public TBufferedTransport {
+ public:
+ TBlockableBufferedTransport(stdcxx::shared_ptr<TTransport> transport)
+ : TBufferedTransport(transport, 10240),
+ blocked_(false) {
+ }
+
+ uint32_t write_buffer_length() {
+ uint32_t have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get());
+ return have_bytes ;
+ }
+
+ void block() {
+ blocked_ = true ;
+#ifdef ENABLE_STDERR_LOGGING
+ cerr << "block flushing\n" ;
+#endif
+ }
+ void unblock() {
+ blocked_ = false ;
+#ifdef ENABLE_STDERR_LOGGING
+ cerr << "unblock flushing, buffer is\n<<" << std::string((char *)wBuf_.get(), write_buffer_length()) << ">>\n" ;
+#endif
+ }
+
+ void flush() override {
+ if (blocked_) {
+#ifdef ENABLE_STDERR_LOGGING
+ cerr << "flush was blocked\n" ;
+#endif
+ return ;
+ }
+ TBufferedTransport::flush() ;
+ }
+
+ bool blocked_ ;
+} ;
+
+BOOST_AUTO_TEST_CASE( JSON_BufferedHTTP )
+{
+ stdcxx::shared_ptr<TServerSocket> ss = stdcxx::make_shared<TServerSocket>(0) ;
+ TThreadedServer server(
+ stdcxx::make_shared<onewaytest::OneWayServiceProcessorFactory>(stdcxx::make_shared<OneWayServiceCloneFactory>()),
+ ss, //port
+ stdcxx::make_shared<THttpServerTransportFactory>(),
+ stdcxx::make_shared<TJSONProtocolFactory>());
+
+ stdcxx::shared_ptr<TServerReadyEventHandler> pEventHandler(new TServerReadyEventHandler) ;
+ server.setServerEventHandler(pEventHandler);
+
+#ifdef ENABLE_STDERR_LOGGING
+ cerr << "Starting the server...\n";
+#endif
+ RPC0ThreadClass t(server) ;
+ boost::thread thread(&RPC0ThreadClass::Run, &t);
+
+ {
+ Synchronized sync(*(pEventHandler.get()));
+ while (!pEventHandler->isListening()) {
+ pEventHandler->wait();
+ }
+ }
+
+ int port = ss->getPort() ;
+#ifdef ENABLE_STDERR_LOGGING
+ cerr << "port " << port << endl ;
+#endif
+
+ {
+ stdcxx::shared_ptr<TSocket> socket(new TSocket("localhost", port));
+ socket->setRecvTimeout(10000) ; // 1000msec should be enough
+ stdcxx::shared_ptr<TBlockableBufferedTransport> blockable_transport(new TBlockableBufferedTransport(socket));
+ stdcxx::shared_ptr<TTransport> transport(new THttpClient(blockable_transport, "localhost", "/service"));
+ stdcxx::shared_ptr<TProtocol> protocol(new TJSONProtocol(transport));
+ onewaytest::OneWayServiceClient client(protocol);
+
+
+ transport->open();
+ client.roundTripRPC();
+ blockable_transport->block() ;
+ uint32_t size0 = blockable_transport->write_buffer_length() ;
+ client.send_oneWayRPC() ;
+ uint32_t size1 = blockable_transport->write_buffer_length() ;
+ client.send_oneWayRPC() ;
+ uint32_t size2 = blockable_transport->write_buffer_length() ;
+ BOOST_CHECK((size1 - size0) == (size2 - size1)) ;
+ blockable_transport->unblock() ;
+ client.send_roundTripRPC() ;
+ blockable_transport->flush() ;
+ try {
+ client.recv_roundTripRPC() ;
+ } catch (TTransportException e) {
+ BOOST_ERROR( "we should not get a transport exception -- this means we failed: " + std::string(e.what()) ) ;
+ }
+ transport->close();
+ }
+ server.stop();
+ thread.join() ;
+#ifdef ENABLE_STDERR_LOGGING
+ cerr << "finished.\n";
+#endif
+}
+
+BOOST_AUTO_TEST_SUITE_END()