blob: 73bcdbabf970b9c5ba5e85036c0c6ee993662088 [file] [log] [blame]
Ben Craig1684c422015-04-24 08:52:44 -05001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#define BOOST_TEST_MODULE TServerIntegrationTest
21#include <boost/test/auto_unit_test.hpp>
22#include <boost/bind.hpp>
Jim King79c99112015-04-30 07:10:08 -040023#include <boost/foreach.hpp>
Ben Craig1684c422015-04-24 08:52:44 -050024#include <boost/format.hpp>
25#include <boost/shared_ptr.hpp>
26#include <boost/thread.hpp>
Jim King79c99112015-04-30 07:10:08 -040027#include <thrift/server/TSimpleServer.h>
28#include <thrift/server/TThreadPoolServer.h>
Ben Craig1684c422015-04-24 08:52:44 -050029#include <thrift/server/TThreadedServer.h>
30#include <thrift/protocol/TBinaryProtocol.h>
31#include <thrift/transport/TServerSocket.h>
32#include <thrift/transport/TSocket.h>
33#include <thrift/transport/TTransport.h>
34#include "gen-cpp/ParentService.h"
35#include "TestPortFixture.h"
36#include <vector>
37
38using apache::thrift::concurrency::Guard;
39using apache::thrift::concurrency::Monitor;
40using apache::thrift::concurrency::Mutex;
41using apache::thrift::concurrency::Synchronized;
42using apache::thrift::protocol::TBinaryProtocol;
43using apache::thrift::protocol::TBinaryProtocolFactory;
44using apache::thrift::protocol::TProtocol;
45using apache::thrift::protocol::TProtocolFactory;
46using apache::thrift::transport::TServerSocket;
47using apache::thrift::transport::TServerTransport;
48using apache::thrift::transport::TSocket;
49using apache::thrift::transport::TTransport;
Jim King79c99112015-04-30 07:10:08 -040050using apache::thrift::transport::TTransportException;
Ben Craig1684c422015-04-24 08:52:44 -050051using apache::thrift::transport::TTransportFactory;
Jim King79c99112015-04-30 07:10:08 -040052using apache::thrift::server::TServer;
Ben Craig1684c422015-04-24 08:52:44 -050053using apache::thrift::server::TServerEventHandler;
Jim King79c99112015-04-30 07:10:08 -040054using apache::thrift::server::TSimpleServer;
55using apache::thrift::server::TThreadPoolServer;
Ben Craig1684c422015-04-24 08:52:44 -050056using apache::thrift::server::TThreadedServer;
57using apache::thrift::test::ParentServiceClient;
58using apache::thrift::test::ParentServiceIf;
59using apache::thrift::test::ParentServiceProcessor;
Jim King79c99112015-04-30 07:10:08 -040060using boost::posix_time::milliseconds;
Ben Craig1684c422015-04-24 08:52:44 -050061
62/**
63 * preServe runs after listen() is successful, when we can connect
64 */
65class TServerReadyEventHandler : public TServerEventHandler, public Monitor
66{
67public:
68 TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
69 virtual ~TServerReadyEventHandler() {}
70 virtual void preServe() {
71 Synchronized sync(*this);
72 isListening_ = true;
73 notify();
74 }
75 virtual void* createContext(boost::shared_ptr<TProtocol> input,
76 boost::shared_ptr<TProtocol> output) {
77 Synchronized sync(*this);
78 ++accepted_;
79 notify();
80
81 (void)input;
82 (void)output;
83 return NULL;
84 }
85 bool isListening() const { return isListening_; }
86 uint64_t acceptedCount() const { return accepted_; }
87private:
88 bool isListening_;
89 uint64_t accepted_;
90};
91
Jim King79c99112015-04-30 07:10:08 -040092/**
93 * Reusing another generated test, just something to serve up
94 */
95class ParentHandler : public ParentServiceIf {
Ben Craig1684c422015-04-24 08:52:44 -050096public:
97 ParentHandler() : generation_(0) {}
98
99 int32_t incrementGeneration() {
100 Guard g(mutex_);
101 return ++generation_;
102 }
103
104 int32_t getGeneration() {
105 Guard g(mutex_);
106 return generation_;
107 }
108
109 void addString(const std::string& s) {
110 Guard g(mutex_);
111 strings_.push_back(s);
112 }
113
114 void getStrings(std::vector<std::string>& _return) {
115 Guard g(mutex_);
116 _return = strings_;
117 }
118
119 void getDataWait(std::string& _return, int32_t length) {
120 }
121
122 void onewayWait() {
123 }
124
125 void exceptionWait(const std::string& message) {
126 }
127
128 void unexpectedExceptionWait(const std::string& message) {
129 }
130
131protected:
132 Mutex mutex_;
133 int32_t generation_;
134 std::vector<std::string> strings_;
135};
136
Jim King79c99112015-04-30 07:10:08 -0400137void autoSocketCloser(TSocket *pSock) {
138 pSock->close();
139 delete pSock;
140}
141
142template<class TServerType>
Ben Craig1684c422015-04-24 08:52:44 -0500143class TServerIntegrationTestFixture : public TestPortFixture
144{
145public:
146 TServerIntegrationTestFixture() :
Jim King79c99112015-04-30 07:10:08 -0400147 pServer(new TServerType(
Ben Craig1684c422015-04-24 08:52:44 -0500148 boost::shared_ptr<ParentServiceProcessor>(new ParentServiceProcessor(
149 boost::shared_ptr<ParentServiceIf>(new ParentHandler))),
150 boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)),
151 boost::shared_ptr<TTransportFactory>(new TTransportFactory),
152 boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
153 pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler))
154 {
155 pServer->setServerEventHandler(pEventHandler);
156 }
157
158 void startServer() {
Jim King79c99112015-04-30 07:10:08 -0400159 pServerThread.reset(new boost::thread(boost::bind(&TServerType::serve, pServer.get())));
Ben Craig1684c422015-04-24 08:52:44 -0500160
161 // block until listen() completes so clients will be able to connect
162 Synchronized sync(*(pEventHandler.get()));
163 while (!pEventHandler->isListening()) {
164 pEventHandler->wait();
165 }
166
167 BOOST_MESSAGE("server is listening");
168 }
169
170 void blockUntilAccepted(uint64_t numAccepted) {
171 Synchronized sync(*(pEventHandler.get()));
172 while (pEventHandler->acceptedCount() < numAccepted) {
173 pEventHandler->wait();
174 }
175
176 BOOST_MESSAGE(boost::format("server has accepted %1%") % numAccepted);
177 }
178
179 void stopServer() {
Jim King79c99112015-04-30 07:10:08 -0400180 if (pServerThread) {
181 pServer->stop();
182 BOOST_MESSAGE("server stop completed");
183
184 pServerThread->join();
185 BOOST_MESSAGE("server thread joined");
186 pServerThread.reset();
187 }
Ben Craig1684c422015-04-24 08:52:44 -0500188 }
189
190 ~TServerIntegrationTestFixture() {
191 stopServer();
192 }
193
Jim King79c99112015-04-30 07:10:08 -0400194 void delayClose(boost::shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
195 boost::this_thread::sleep(after);
Ben Craig1684c422015-04-24 08:52:44 -0500196 toClose->close();
197 }
198
Jim King79c99112015-04-30 07:10:08 -0400199 void baseline(int64_t numToMake, int64_t expectedHWM) {
200 startServer();
201 std::vector<boost::shared_ptr<TSocket> > holdSockets;
202 std::vector<boost::shared_ptr<boost::thread> > holdThreads;
203
204 for (int64_t i = 0; i < numToMake; ++i) {
205 boost::shared_ptr<TSocket> pClientSock(new TSocket("localhost", m_serverPort), autoSocketCloser);
206 holdSockets.push_back(pClientSock);
207 boost::shared_ptr<TProtocol> pClientProtocol(new TBinaryProtocol(pClientSock));
208 ParentServiceClient client(pClientProtocol);
209 pClientSock->open();
210 client.incrementGeneration();
211 holdThreads.push_back(
212 boost::shared_ptr<boost::thread>(
213 new boost::thread(
214 boost::bind(&TServerIntegrationTestFixture::delayClose, this,
215 pClientSock, milliseconds(100 * numToMake)))));
216 }
217
218 BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM());
219 stopServer();
220 BOOST_FOREACH(boost::shared_ptr<boost::thread> pThread, holdThreads) {
221 pThread->join();
222 }
223 holdThreads.clear();
224 holdSockets.clear();
225 }
226
227 boost::shared_ptr<TServerType> pServer;
Ben Craig1684c422015-04-24 08:52:44 -0500228 boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
229 boost::shared_ptr<boost::thread> pServerThread;
230};
231
Jim King79c99112015-04-30 07:10:08 -0400232BOOST_FIXTURE_TEST_SUITE( Baseline, TestPortFixture )
Ben Craig1684c422015-04-24 08:52:44 -0500233
Jim King79c99112015-04-30 07:10:08 -0400234BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationTestFixture<TSimpleServer>)
Ben Craig1684c422015-04-24 08:52:44 -0500235{
Jim King79c99112015-04-30 07:10:08 -0400236 baseline(3, 1);
Ben Craig1684c422015-04-24 08:52:44 -0500237}
238
Jim King79c99112015-04-30 07:10:08 -0400239BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationTestFixture<TThreadedServer>)
240{
241 baseline(10, 10);
242}
243
244BOOST_FIXTURE_TEST_CASE(test_threaded_bound, TServerIntegrationTestFixture<TThreadedServer>)
245{
246 pServer->setConcurrentClientLimit(4);
247 baseline(10, 4);
248}
249
250BOOST_FIXTURE_TEST_CASE(test_threadpool, TServerIntegrationTestFixture<TThreadPoolServer>)
251{
252 pServer->getThreadManager()->threadFactory(
253 boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
254 new apache::thrift::concurrency::PlatformThreadFactory));
255 pServer->getThreadManager()->start();
256
257 // thread factory has 4 threads as a default
258 // thread factory however is a bad way to limit concurrent clients
259 // as accept() will be called to grab a 5th client socket, in this case
260 // and then the thread factory will block adding the thread to manage
261 // that client.
262 baseline(10, 5);
263}
264
265BOOST_FIXTURE_TEST_CASE(test_threadpool_bound, TServerIntegrationTestFixture<TThreadPoolServer>)
266{
267 pServer->getThreadManager()->threadFactory(
268 boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
269 new apache::thrift::concurrency::PlatformThreadFactory));
270 pServer->getThreadManager()->start();
271 pServer->setConcurrentClientLimit(4);
272
273 baseline(10, 4);
274}
275
276BOOST_AUTO_TEST_SUITE_END()
277
278
279BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture<TThreadedServer> )
280
Ben Craig1684c422015-04-24 08:52:44 -0500281BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
282{
283 // This tests THRIFT-2441 new behavior: stopping the server disconnects clients
284
285 startServer();
286
Jim King79c99112015-04-30 07:10:08 -0400287 boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
Ben Craig1684c422015-04-24 08:52:44 -0500288 pClientSock1->open();
289
Jim King79c99112015-04-30 07:10:08 -0400290 boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
Ben Craig1684c422015-04-24 08:52:44 -0500291 pClientSock2->open();
292
293 // Ensure they have been accepted
294 blockUntilAccepted(2);
295
296 // The test fixture destructor will force the sockets to disconnect
297 // Prior to THRIFT-2441, pServer->stop() would hang until clients disconnected
298 stopServer();
299
300 // extra proof the server end disconnected the clients
301 uint8_t buf[1];
302 BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected
303 BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected
Ben Craig1684c422015-04-24 08:52:44 -0500304}
305
306BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
307{
308 // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
309 // disconnect.
310
311 boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())->
312 setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
Jim King79c99112015-04-30 07:10:08 -0400313
Ben Craig1684c422015-04-24 08:52:44 -0500314 startServer();
315
Jim King79c99112015-04-30 07:10:08 -0400316 boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
Ben Craig1684c422015-04-24 08:52:44 -0500317 pClientSock1->open();
318
Jim King79c99112015-04-30 07:10:08 -0400319 boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
Ben Craig1684c422015-04-24 08:52:44 -0500320 pClientSock2->open();
321
322 // Ensure they have been accepted
323 blockUntilAccepted(2);
324
Jim King79c99112015-04-30 07:10:08 -0400325 boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1, milliseconds(250)));
326 boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
Ben Craig1684c422015-04-24 08:52:44 -0500327
328 // Once the clients disconnect the server will stop
329 stopServer();
Jim King79c99112015-04-30 07:10:08 -0400330 t1.join();
331 t2.join();
Ben Craig1684c422015-04-24 08:52:44 -0500332}
Jim King79c99112015-04-30 07:10:08 -0400333
334BOOST_AUTO_TEST_CASE(test_concurrent_client_limit)
335{
336 startServer();
337
338 BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit());
339 pServer->setConcurrentClientLimit(2);
340 BOOST_CHECK_EQUAL(0, pServer->getConcurrentClientCount());
341 BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientLimit());
342
343 boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
344 pClientSock1->open();
345 blockUntilAccepted(1);
346 BOOST_CHECK_EQUAL(1, pServer->getConcurrentClientCount());
347
348 boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
349 pClientSock2->open();
350 blockUntilAccepted(2);
351 BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
352
353 // a third client cannot connect until one of the other two closes
354 boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
355 boost::shared_ptr<TSocket> pClientSock3(new TSocket("localhost", m_serverPort), autoSocketCloser);
356 pClientSock2->open();
357 blockUntilAccepted(2);
358 BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
359 BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM());
360
361 stopServer();
362 t2.join();
363}
364
Ben Craig1684c422015-04-24 08:52:44 -0500365BOOST_AUTO_TEST_SUITE_END()