blob: 8789c2c8ffb50769061ec548dc20d07b5cce5316 [file] [log] [blame]
Chet Murthyad08a8b2017-12-19 15:55:56 -08001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
zeshuai0076b1cb302020-05-27 12:08:01 +080020#include <boost/test/unit_test.hpp>
Chet Murthyad08a8b2017-12-19 15:55:56 -080021#include <boost/thread.hpp>
22#include <iostream>
23#include <climits>
24#include <vector>
25#include <thrift/concurrency/Monitor.h>
26#include <thrift/protocol/TBinaryProtocol.h>
27#include <thrift/protocol/TJSONProtocol.h>
28#include <thrift/server/TThreadedServer.h>
29#include <thrift/transport/THttpServer.h>
30#include <thrift/transport/THttpClient.h>
31#include <thrift/transport/TServerSocket.h>
32#include <thrift/transport/TSocket.h>
cyy316723a2019-01-05 16:35:14 +080033#include <memory>
Chet Murthyad08a8b2017-12-19 15:55:56 -080034#include <thrift/transport/TBufferTransports.h>
35#include "gen-cpp/OneWayService.h"
36
37BOOST_AUTO_TEST_SUITE(OneWayHTTPTest)
38
39using namespace apache::thrift;
40using apache::thrift::protocol::TProtocol;
41using apache::thrift::protocol::TBinaryProtocol;
42using apache::thrift::protocol::TBinaryProtocolFactory;
43using apache::thrift::protocol::TJSONProtocol;
44using apache::thrift::protocol::TJSONProtocolFactory;
45using apache::thrift::server::TThreadedServer;
46using apache::thrift::server::TServerEventHandler;
47using apache::thrift::transport::TTransport;
48using apache::thrift::transport::THttpServer;
49using apache::thrift::transport::THttpServerTransportFactory;
50using apache::thrift::transport::THttpClient;
51using apache::thrift::transport::TBufferedTransport;
52using apache::thrift::transport::TBufferedTransportFactory;
53using apache::thrift::transport::TMemoryBuffer;
54using apache::thrift::transport::TServerSocket;
55using apache::thrift::transport::TSocket;
56using apache::thrift::transport::TTransportException;
cyy316723a2019-01-05 16:35:14 +080057using std::shared_ptr;
Chet Murthyad08a8b2017-12-19 15:55:56 -080058using std::string;
59namespace utf = boost::unit_test;
60
61// Define this env var to enable some logging (in case you need to debug)
62#undef ENABLE_STDERR_LOGGING
63
64class OneWayServiceHandler : public onewaytest::OneWayServiceIf {
65public:
Sebastian Zenker042580f2019-01-29 15:48:12 +010066 OneWayServiceHandler() = default;
Chet Murthyad08a8b2017-12-19 15:55:56 -080067
68 void roundTripRPC() override {
69#ifdef ENABLE_STDERR_LOGGING
CJCombrink4a280d52024-03-14 19:57:41 +010070 cerr << "roundTripRPC()" << '\n';
Chet Murthyad08a8b2017-12-19 15:55:56 -080071#endif
72 }
Sebastian Zenker042580f2019-01-29 15:48:12 +010073 void oneWayRPC() override {
Chet Murthyad08a8b2017-12-19 15:55:56 -080074#ifdef ENABLE_STDERR_LOGGING
CJCombrink4a280d52024-03-14 19:57:41 +010075 cerr << "oneWayRPC()" << '\n';
Chet Murthyad08a8b2017-12-19 15:55:56 -080076#endif
77 }
78};
79
80class OneWayServiceCloneFactory : virtual public onewaytest::OneWayServiceIfFactory {
81 public:
Sebastian Zenker042580f2019-01-29 15:48:12 +010082 ~OneWayServiceCloneFactory() override = default;
83 onewaytest::OneWayServiceIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override
Chet Murthyad08a8b2017-12-19 15:55:56 -080084 {
85 (void)connInfo ;
86 return new OneWayServiceHandler;
87 }
Sebastian Zenker042580f2019-01-29 15:48:12 +010088 void releaseHandler( onewaytest::OneWayServiceIf* handler) override {
Chet Murthyad08a8b2017-12-19 15:55:56 -080089 delete handler;
90 }
91};
92
93class RPC0ThreadClass {
94public:
95 RPC0ThreadClass(TThreadedServer& server) : server_(server) { } // Constructor
Sebastian Zenker042580f2019-01-29 15:48:12 +010096~RPC0ThreadClass() = default; // Destructor
Chet Murthyad08a8b2017-12-19 15:55:56 -080097
98void Run() {
99 server_.serve() ;
100}
101 TThreadedServer& server_ ;
102} ;
103
104using apache::thrift::concurrency::Monitor;
105using apache::thrift::concurrency::Mutex;
106using apache::thrift::concurrency::Synchronized;
107
108// copied from IntegrationTest
109class TServerReadyEventHandler : public TServerEventHandler, public Monitor {
110public:
111 TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
Sebastian Zenker042580f2019-01-29 15:48:12 +0100112 ~TServerReadyEventHandler() override = default;
113 void preServe() override {
Chet Murthyad08a8b2017-12-19 15:55:56 -0800114 Synchronized sync(*this);
115 isListening_ = true;
116 notify();
117 }
Sebastian Zenker042580f2019-01-29 15:48:12 +0100118 void* createContext(shared_ptr<TProtocol> input,
119 shared_ptr<TProtocol> output) override {
Chet Murthyad08a8b2017-12-19 15:55:56 -0800120 Synchronized sync(*this);
121 ++accepted_;
122 notify();
123
124 (void)input;
125 (void)output;
Sebastian Zenker042580f2019-01-29 15:48:12 +0100126 return nullptr;
Chet Murthyad08a8b2017-12-19 15:55:56 -0800127 }
128 bool isListening() const { return isListening_; }
129 uint64_t acceptedCount() const { return accepted_; }
130
131private:
132 bool isListening_;
133 uint64_t accepted_;
134};
135
136class TBlockableBufferedTransport : public TBufferedTransport {
137 public:
cyy316723a2019-01-05 16:35:14 +0800138 TBlockableBufferedTransport(std::shared_ptr<TTransport> transport)
Chet Murthyad08a8b2017-12-19 15:55:56 -0800139 : TBufferedTransport(transport, 10240),
140 blocked_(false) {
141 }
142
143 uint32_t write_buffer_length() {
Sebastian Zenker042580f2019-01-29 15:48:12 +0100144 auto have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get());
Chet Murthyad08a8b2017-12-19 15:55:56 -0800145 return have_bytes ;
146 }
147
148 void block() {
149 blocked_ = true ;
150#ifdef ENABLE_STDERR_LOGGING
151 cerr << "block flushing\n" ;
152#endif
153 }
154 void unblock() {
155 blocked_ = false ;
156#ifdef ENABLE_STDERR_LOGGING
157 cerr << "unblock flushing, buffer is\n<<" << std::string((char *)wBuf_.get(), write_buffer_length()) << ">>\n" ;
158#endif
159 }
160
161 void flush() override {
162 if (blocked_) {
163#ifdef ENABLE_STDERR_LOGGING
164 cerr << "flush was blocked\n" ;
165#endif
166 return ;
167 }
168 TBufferedTransport::flush() ;
169 }
170
171 bool blocked_ ;
172} ;
173
174BOOST_AUTO_TEST_CASE( JSON_BufferedHTTP )
175{
cyy316723a2019-01-05 16:35:14 +0800176 std::shared_ptr<TServerSocket> ss = std::make_shared<TServerSocket>(0) ;
Chet Murthyad08a8b2017-12-19 15:55:56 -0800177 TThreadedServer server(
cyy316723a2019-01-05 16:35:14 +0800178 std::make_shared<onewaytest::OneWayServiceProcessorFactory>(std::make_shared<OneWayServiceCloneFactory>()),
Chet Murthyad08a8b2017-12-19 15:55:56 -0800179 ss, //port
cyy316723a2019-01-05 16:35:14 +0800180 std::make_shared<THttpServerTransportFactory>(),
181 std::make_shared<TJSONProtocolFactory>());
Chet Murthyad08a8b2017-12-19 15:55:56 -0800182
cyy316723a2019-01-05 16:35:14 +0800183 std::shared_ptr<TServerReadyEventHandler> pEventHandler(new TServerReadyEventHandler) ;
Chet Murthyad08a8b2017-12-19 15:55:56 -0800184 server.setServerEventHandler(pEventHandler);
185
186#ifdef ENABLE_STDERR_LOGGING
187 cerr << "Starting the server...\n";
188#endif
189 RPC0ThreadClass t(server) ;
190 boost::thread thread(&RPC0ThreadClass::Run, &t);
191
192 {
193 Synchronized sync(*(pEventHandler.get()));
194 while (!pEventHandler->isListening()) {
195 pEventHandler->wait();
196 }
197 }
198
199 int port = ss->getPort() ;
200#ifdef ENABLE_STDERR_LOGGING
CJCombrink4a280d52024-03-14 19:57:41 +0100201 cerr << "port " << port << '\n';
Chet Murthyad08a8b2017-12-19 15:55:56 -0800202#endif
203
204 {
cyy316723a2019-01-05 16:35:14 +0800205 std::shared_ptr<TSocket> socket(new TSocket("localhost", port));
Chet Murthyad08a8b2017-12-19 15:55:56 -0800206 socket->setRecvTimeout(10000) ; // 1000msec should be enough
cyy316723a2019-01-05 16:35:14 +0800207 std::shared_ptr<TBlockableBufferedTransport> blockable_transport(new TBlockableBufferedTransport(socket));
208 std::shared_ptr<TTransport> transport(new THttpClient(blockable_transport, "localhost", "/service"));
209 std::shared_ptr<TProtocol> protocol(new TJSONProtocol(transport));
Chet Murthyad08a8b2017-12-19 15:55:56 -0800210 onewaytest::OneWayServiceClient client(protocol);
211
212
213 transport->open();
214 client.roundTripRPC();
215 blockable_transport->block() ;
216 uint32_t size0 = blockable_transport->write_buffer_length() ;
217 client.send_oneWayRPC() ;
218 uint32_t size1 = blockable_transport->write_buffer_length() ;
219 client.send_oneWayRPC() ;
220 uint32_t size2 = blockable_transport->write_buffer_length() ;
221 BOOST_CHECK((size1 - size0) == (size2 - size1)) ;
222 blockable_transport->unblock() ;
223 client.send_roundTripRPC() ;
224 blockable_transport->flush() ;
225 try {
226 client.recv_roundTripRPC() ;
cyy9fed9012019-01-16 14:43:51 +0800227 } catch (const TTransportException &e) {
Chet Murthyad08a8b2017-12-19 15:55:56 -0800228 BOOST_ERROR( "we should not get a transport exception -- this means we failed: " + std::string(e.what()) ) ;
229 }
230 transport->close();
231 }
232 server.stop();
233 thread.join() ;
234#ifdef ENABLE_STDERR_LOGGING
235 cerr << "finished.\n";
236#endif
237}
238
239BOOST_AUTO_TEST_SUITE_END()