blob: 3fe63b612a4072b3f2fd988454bbf19599b9bb28 [file] [log] [blame]
Chet Murthyad08a8b2017-12-19 15:55:56 -08001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include <boost/test/auto_unit_test.hpp>
21#include <boost/thread.hpp>
22#include <iostream>
23#include <climits>
24#include <vector>
25#include <thrift/concurrency/Monitor.h>
26#include <thrift/protocol/TBinaryProtocol.h>
27#include <thrift/protocol/TJSONProtocol.h>
28#include <thrift/server/TThreadedServer.h>
29#include <thrift/transport/THttpServer.h>
30#include <thrift/transport/THttpClient.h>
31#include <thrift/transport/TServerSocket.h>
32#include <thrift/transport/TSocket.h>
33#include <thrift/stdcxx.h>
34#include <thrift/transport/TBufferTransports.h>
35#include "gen-cpp/OneWayService.h"
36
37BOOST_AUTO_TEST_SUITE(OneWayHTTPTest)
38
39using namespace apache::thrift;
40using apache::thrift::protocol::TProtocol;
41using apache::thrift::protocol::TBinaryProtocol;
42using apache::thrift::protocol::TBinaryProtocolFactory;
43using apache::thrift::protocol::TJSONProtocol;
44using apache::thrift::protocol::TJSONProtocolFactory;
45using apache::thrift::server::TThreadedServer;
46using apache::thrift::server::TServerEventHandler;
47using apache::thrift::transport::TTransport;
48using apache::thrift::transport::THttpServer;
49using apache::thrift::transport::THttpServerTransportFactory;
50using apache::thrift::transport::THttpClient;
51using apache::thrift::transport::TBufferedTransport;
52using apache::thrift::transport::TBufferedTransportFactory;
53using apache::thrift::transport::TMemoryBuffer;
54using apache::thrift::transport::TServerSocket;
55using apache::thrift::transport::TSocket;
56using apache::thrift::transport::TTransportException;
57using apache::thrift::stdcxx::shared_ptr;
58using std::cout;
59using std::cerr;
60using std::endl;
61using std::string;
62namespace utf = boost::unit_test;
63
64// Define this env var to enable some logging (in case you need to debug)
65#undef ENABLE_STDERR_LOGGING
66
67class OneWayServiceHandler : public onewaytest::OneWayServiceIf {
68public:
69 OneWayServiceHandler() {}
70
71 void roundTripRPC() override {
72#ifdef ENABLE_STDERR_LOGGING
73 cerr << "roundTripRPC()" << endl;
74#endif
75 }
76 void oneWayRPC() {
77#ifdef ENABLE_STDERR_LOGGING
78 cerr << "oneWayRPC()" << std::endl ;
79#endif
80 }
81};
82
83class OneWayServiceCloneFactory : virtual public onewaytest::OneWayServiceIfFactory {
84 public:
85 virtual ~OneWayServiceCloneFactory() {}
86 virtual onewaytest::OneWayServiceIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo)
87 {
88 (void)connInfo ;
89 return new OneWayServiceHandler;
90 }
91 virtual void releaseHandler( onewaytest::OneWayServiceIf* handler) {
92 delete handler;
93 }
94};
95
96class RPC0ThreadClass {
97public:
98 RPC0ThreadClass(TThreadedServer& server) : server_(server) { } // Constructor
99~RPC0ThreadClass() { } // Destructor
100
101void Run() {
102 server_.serve() ;
103}
104 TThreadedServer& server_ ;
105} ;
106
107using apache::thrift::concurrency::Monitor;
108using apache::thrift::concurrency::Mutex;
109using apache::thrift::concurrency::Synchronized;
110
111// copied from IntegrationTest
112class TServerReadyEventHandler : public TServerEventHandler, public Monitor {
113public:
114 TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
115 virtual ~TServerReadyEventHandler() {}
116 virtual void preServe() {
117 Synchronized sync(*this);
118 isListening_ = true;
119 notify();
120 }
121 virtual void* createContext(shared_ptr<TProtocol> input,
122 shared_ptr<TProtocol> output) {
123 Synchronized sync(*this);
124 ++accepted_;
125 notify();
126
127 (void)input;
128 (void)output;
129 return NULL;
130 }
131 bool isListening() const { return isListening_; }
132 uint64_t acceptedCount() const { return accepted_; }
133
134private:
135 bool isListening_;
136 uint64_t accepted_;
137};
138
139class TBlockableBufferedTransport : public TBufferedTransport {
140 public:
141 TBlockableBufferedTransport(stdcxx::shared_ptr<TTransport> transport)
142 : TBufferedTransport(transport, 10240),
143 blocked_(false) {
144 }
145
146 uint32_t write_buffer_length() {
147 uint32_t have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get());
148 return have_bytes ;
149 }
150
151 void block() {
152 blocked_ = true ;
153#ifdef ENABLE_STDERR_LOGGING
154 cerr << "block flushing\n" ;
155#endif
156 }
157 void unblock() {
158 blocked_ = false ;
159#ifdef ENABLE_STDERR_LOGGING
160 cerr << "unblock flushing, buffer is\n<<" << std::string((char *)wBuf_.get(), write_buffer_length()) << ">>\n" ;
161#endif
162 }
163
164 void flush() override {
165 if (blocked_) {
166#ifdef ENABLE_STDERR_LOGGING
167 cerr << "flush was blocked\n" ;
168#endif
169 return ;
170 }
171 TBufferedTransport::flush() ;
172 }
173
174 bool blocked_ ;
175} ;
176
177BOOST_AUTO_TEST_CASE( JSON_BufferedHTTP )
178{
179 stdcxx::shared_ptr<TServerSocket> ss = stdcxx::make_shared<TServerSocket>(0) ;
180 TThreadedServer server(
181 stdcxx::make_shared<onewaytest::OneWayServiceProcessorFactory>(stdcxx::make_shared<OneWayServiceCloneFactory>()),
182 ss, //port
183 stdcxx::make_shared<THttpServerTransportFactory>(),
184 stdcxx::make_shared<TJSONProtocolFactory>());
185
186 stdcxx::shared_ptr<TServerReadyEventHandler> pEventHandler(new TServerReadyEventHandler) ;
187 server.setServerEventHandler(pEventHandler);
188
189#ifdef ENABLE_STDERR_LOGGING
190 cerr << "Starting the server...\n";
191#endif
192 RPC0ThreadClass t(server) ;
193 boost::thread thread(&RPC0ThreadClass::Run, &t);
194
195 {
196 Synchronized sync(*(pEventHandler.get()));
197 while (!pEventHandler->isListening()) {
198 pEventHandler->wait();
199 }
200 }
201
202 int port = ss->getPort() ;
203#ifdef ENABLE_STDERR_LOGGING
204 cerr << "port " << port << endl ;
205#endif
206
207 {
208 stdcxx::shared_ptr<TSocket> socket(new TSocket("localhost", port));
209 socket->setRecvTimeout(10000) ; // 1000msec should be enough
210 stdcxx::shared_ptr<TBlockableBufferedTransport> blockable_transport(new TBlockableBufferedTransport(socket));
211 stdcxx::shared_ptr<TTransport> transport(new THttpClient(blockable_transport, "localhost", "/service"));
212 stdcxx::shared_ptr<TProtocol> protocol(new TJSONProtocol(transport));
213 onewaytest::OneWayServiceClient client(protocol);
214
215
216 transport->open();
217 client.roundTripRPC();
218 blockable_transport->block() ;
219 uint32_t size0 = blockable_transport->write_buffer_length() ;
220 client.send_oneWayRPC() ;
221 uint32_t size1 = blockable_transport->write_buffer_length() ;
222 client.send_oneWayRPC() ;
223 uint32_t size2 = blockable_transport->write_buffer_length() ;
224 BOOST_CHECK((size1 - size0) == (size2 - size1)) ;
225 blockable_transport->unblock() ;
226 client.send_roundTripRPC() ;
227 blockable_transport->flush() ;
228 try {
229 client.recv_roundTripRPC() ;
230 } catch (TTransportException e) {
231 BOOST_ERROR( "we should not get a transport exception -- this means we failed: " + std::string(e.what()) ) ;
232 }
233 transport->close();
234 }
235 server.stop();
236 thread.join() ;
237#ifdef ENABLE_STDERR_LOGGING
238 cerr << "finished.\n";
239#endif
240}
241
242BOOST_AUTO_TEST_SUITE_END()