blob: fec1b9cf524a5614adda0d3120e39eabd58640a5 [file] [log] [blame]
Ben Craig1684c422015-04-24 08:52:44 -05001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#define BOOST_TEST_MODULE TServerIntegrationTest
21#include <boost/test/auto_unit_test.hpp>
22#include <boost/bind.hpp>
Jim King79c99112015-04-30 07:10:08 -040023#include <boost/foreach.hpp>
Ben Craig1684c422015-04-24 08:52:44 -050024#include <boost/format.hpp>
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +020025#include <boost/make_shared.hpp>
Ben Craig1684c422015-04-24 08:52:44 -050026#include <boost/shared_ptr.hpp>
27#include <boost/thread.hpp>
Jim King79c99112015-04-30 07:10:08 -040028#include <thrift/server/TSimpleServer.h>
29#include <thrift/server/TThreadPoolServer.h>
Ben Craig1684c422015-04-24 08:52:44 -050030#include <thrift/server/TThreadedServer.h>
31#include <thrift/protocol/TBinaryProtocol.h>
32#include <thrift/transport/TServerSocket.h>
33#include <thrift/transport/TSocket.h>
34#include <thrift/transport/TTransport.h>
35#include "gen-cpp/ParentService.h"
36#include "TestPortFixture.h"
37#include <vector>
38
39using apache::thrift::concurrency::Guard;
40using apache::thrift::concurrency::Monitor;
41using apache::thrift::concurrency::Mutex;
42using apache::thrift::concurrency::Synchronized;
43using apache::thrift::protocol::TBinaryProtocol;
44using apache::thrift::protocol::TBinaryProtocolFactory;
45using apache::thrift::protocol::TProtocol;
46using apache::thrift::protocol::TProtocolFactory;
47using apache::thrift::transport::TServerSocket;
48using apache::thrift::transport::TServerTransport;
49using apache::thrift::transport::TSocket;
50using apache::thrift::transport::TTransport;
Jim King79c99112015-04-30 07:10:08 -040051using apache::thrift::transport::TTransportException;
Ben Craig1684c422015-04-24 08:52:44 -050052using apache::thrift::transport::TTransportFactory;
Jim King79c99112015-04-30 07:10:08 -040053using apache::thrift::server::TServer;
Ben Craig1684c422015-04-24 08:52:44 -050054using apache::thrift::server::TServerEventHandler;
Jim King79c99112015-04-30 07:10:08 -040055using apache::thrift::server::TSimpleServer;
56using apache::thrift::server::TThreadPoolServer;
Ben Craig1684c422015-04-24 08:52:44 -050057using apache::thrift::server::TThreadedServer;
58using apache::thrift::test::ParentServiceClient;
59using apache::thrift::test::ParentServiceIf;
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +020060using apache::thrift::test::ParentServiceIfFactory;
61using apache::thrift::test::ParentServiceIfSingletonFactory;
Ben Craig1684c422015-04-24 08:52:44 -050062using apache::thrift::test::ParentServiceProcessor;
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +020063using apache::thrift::test::ParentServiceProcessorFactory;
64using apache::thrift::TProcessor;
65using apache::thrift::TProcessorFactory;
Jim King79c99112015-04-30 07:10:08 -040066using boost::posix_time::milliseconds;
Ben Craig1684c422015-04-24 08:52:44 -050067
68/**
69 * preServe runs after listen() is successful, when we can connect
70 */
71class TServerReadyEventHandler : public TServerEventHandler, public Monitor
72{
73public:
74 TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
75 virtual ~TServerReadyEventHandler() {}
76 virtual void preServe() {
77 Synchronized sync(*this);
78 isListening_ = true;
79 notify();
80 }
81 virtual void* createContext(boost::shared_ptr<TProtocol> input,
82 boost::shared_ptr<TProtocol> output) {
83 Synchronized sync(*this);
84 ++accepted_;
85 notify();
86
87 (void)input;
88 (void)output;
89 return NULL;
90 }
91 bool isListening() const { return isListening_; }
92 uint64_t acceptedCount() const { return accepted_; }
93private:
94 bool isListening_;
95 uint64_t accepted_;
96};
97
Jim King79c99112015-04-30 07:10:08 -040098/**
99 * Reusing another generated test, just something to serve up
100 */
101class ParentHandler : public ParentServiceIf {
Ben Craig1684c422015-04-24 08:52:44 -0500102public:
103 ParentHandler() : generation_(0) {}
104
105 int32_t incrementGeneration() {
106 Guard g(mutex_);
107 return ++generation_;
108 }
109
110 int32_t getGeneration() {
111 Guard g(mutex_);
112 return generation_;
113 }
114
115 void addString(const std::string& s) {
116 Guard g(mutex_);
117 strings_.push_back(s);
118 }
119
120 void getStrings(std::vector<std::string>& _return) {
121 Guard g(mutex_);
122 _return = strings_;
123 }
124
125 void getDataWait(std::string& _return, int32_t length) {
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200126 THRIFT_UNUSED_VARIABLE(_return);
127 THRIFT_UNUSED_VARIABLE(length);
Ben Craig1684c422015-04-24 08:52:44 -0500128 }
129
130 void onewayWait() {
131 }
132
133 void exceptionWait(const std::string& message) {
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200134 THRIFT_UNUSED_VARIABLE(message);
Ben Craig1684c422015-04-24 08:52:44 -0500135 }
136
137 void unexpectedExceptionWait(const std::string& message) {
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200138 THRIFT_UNUSED_VARIABLE(message);
Ben Craig1684c422015-04-24 08:52:44 -0500139 }
140
141protected:
142 Mutex mutex_;
143 int32_t generation_;
144 std::vector<std::string> strings_;
145};
146
Jim King79c99112015-04-30 07:10:08 -0400147void autoSocketCloser(TSocket *pSock) {
148 pSock->close();
149 delete pSock;
150}
151
152template<class TServerType>
Ben Craig1684c422015-04-24 08:52:44 -0500153class TServerIntegrationTestFixture : public TestPortFixture
154{
155public:
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200156 TServerIntegrationTestFixture(const boost::shared_ptr<TProcessorFactory>& _processorFactory) :
Jim King79c99112015-04-30 07:10:08 -0400157 pServer(new TServerType(
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200158 _processorFactory,
Ben Craig1684c422015-04-24 08:52:44 -0500159 boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)),
160 boost::shared_ptr<TTransportFactory>(new TTransportFactory),
161 boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
162 pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler))
163 {
164 pServer->setServerEventHandler(pEventHandler);
165 }
166
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200167 TServerIntegrationTestFixture(const boost::shared_ptr<TProcessor>& _processor) :
168 pServer(new TServerType(
169 _processor,
170 boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", 0)),
171 boost::shared_ptr<TTransportFactory>(new TTransportFactory),
172 boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
173 pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler))
174 {
175 pServer->setServerEventHandler(pEventHandler);
176 }
177
Ben Craig1684c422015-04-24 08:52:44 -0500178 void startServer() {
Jim King79c99112015-04-30 07:10:08 -0400179 pServerThread.reset(new boost::thread(boost::bind(&TServerType::serve, pServer.get())));
Ben Craig1684c422015-04-24 08:52:44 -0500180
181 // block until listen() completes so clients will be able to connect
182 Synchronized sync(*(pEventHandler.get()));
183 while (!pEventHandler->isListening()) {
184 pEventHandler->wait();
185 }
186
187 BOOST_MESSAGE("server is listening");
188 }
189
190 void blockUntilAccepted(uint64_t numAccepted) {
191 Synchronized sync(*(pEventHandler.get()));
192 while (pEventHandler->acceptedCount() < numAccepted) {
193 pEventHandler->wait();
194 }
195
196 BOOST_MESSAGE(boost::format("server has accepted %1%") % numAccepted);
197 }
198
199 void stopServer() {
Jim King79c99112015-04-30 07:10:08 -0400200 if (pServerThread) {
201 pServer->stop();
202 BOOST_MESSAGE("server stop completed");
203
204 pServerThread->join();
205 BOOST_MESSAGE("server thread joined");
206 pServerThread.reset();
207 }
Ben Craig1684c422015-04-24 08:52:44 -0500208 }
209
210 ~TServerIntegrationTestFixture() {
211 stopServer();
212 }
213
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200214 int getServerPort() {
215 TServerSocket *pSock = dynamic_cast<TServerSocket *>(pServer->getServerTransport().get());
216 return pSock->getPort();
217 }
218
Jim King79c99112015-04-30 07:10:08 -0400219 void delayClose(boost::shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
220 boost::this_thread::sleep(after);
Ben Craig1684c422015-04-24 08:52:44 -0500221 toClose->close();
222 }
223
Jim King79c99112015-04-30 07:10:08 -0400224 void baseline(int64_t numToMake, int64_t expectedHWM) {
225 startServer();
226 std::vector<boost::shared_ptr<TSocket> > holdSockets;
227 std::vector<boost::shared_ptr<boost::thread> > holdThreads;
228
229 for (int64_t i = 0; i < numToMake; ++i) {
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200230 boost::shared_ptr<TSocket> pClientSock(new TSocket("localhost", getServerPort()), autoSocketCloser);
Jim King79c99112015-04-30 07:10:08 -0400231 holdSockets.push_back(pClientSock);
232 boost::shared_ptr<TProtocol> pClientProtocol(new TBinaryProtocol(pClientSock));
233 ParentServiceClient client(pClientProtocol);
234 pClientSock->open();
235 client.incrementGeneration();
236 holdThreads.push_back(
237 boost::shared_ptr<boost::thread>(
238 new boost::thread(
239 boost::bind(&TServerIntegrationTestFixture::delayClose, this,
240 pClientSock, milliseconds(100 * numToMake)))));
241 }
242
243 BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM());
244 stopServer();
245 BOOST_FOREACH(boost::shared_ptr<boost::thread> pThread, holdThreads) {
246 pThread->join();
247 }
248 holdThreads.clear();
249 holdSockets.clear();
250 }
251
252 boost::shared_ptr<TServerType> pServer;
Ben Craig1684c422015-04-24 08:52:44 -0500253 boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
254 boost::shared_ptr<boost::thread> pServerThread;
255};
256
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200257template<class TServerType>
258class TServerIntegrationProcessorFactoryTestFixture : public TServerIntegrationTestFixture<TServerType>
259{
260public:
261 TServerIntegrationProcessorFactoryTestFixture() :
262 TServerIntegrationTestFixture<TServerType>(
263 boost::make_shared<ParentServiceProcessorFactory>(
264 boost::make_shared<ParentServiceIfSingletonFactory>(
265 boost::make_shared<ParentHandler>()))) { }
266};
Ben Craig1684c422015-04-24 08:52:44 -0500267
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200268template<class TServerType>
269class TServerIntegrationProcessorTestFixture : public TServerIntegrationTestFixture<TServerType>
270{
271public:
272 TServerIntegrationProcessorTestFixture() :
273 TServerIntegrationTestFixture<TServerType>(
274 boost::make_shared<ParentServiceProcessor>(
275 boost::make_shared<ParentHandler>())) { }
276};
277
278BOOST_AUTO_TEST_SUITE(constructors)
279
280BOOST_FIXTURE_TEST_CASE(test_simple_factory, TServerIntegrationProcessorFactoryTestFixture<TSimpleServer>)
Ben Craig1684c422015-04-24 08:52:44 -0500281{
Jim King79c99112015-04-30 07:10:08 -0400282 baseline(3, 1);
Ben Craig1684c422015-04-24 08:52:44 -0500283}
284
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200285BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationProcessorTestFixture<TSimpleServer>)
286{
287 baseline(3, 1);
288}
289
290BOOST_FIXTURE_TEST_CASE(test_threaded_factory, TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>)
Jim King79c99112015-04-30 07:10:08 -0400291{
292 baseline(10, 10);
293}
294
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200295BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationProcessorTestFixture<TThreadedServer>)
296{
297 baseline(10, 10);
298}
299
300BOOST_FIXTURE_TEST_CASE(test_threaded_bound, TServerIntegrationProcessorTestFixture<TThreadedServer>)
Jim King79c99112015-04-30 07:10:08 -0400301{
302 pServer->setConcurrentClientLimit(4);
303 baseline(10, 4);
304}
305
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200306BOOST_FIXTURE_TEST_CASE(test_threadpool_factory, TServerIntegrationProcessorFactoryTestFixture<TThreadPoolServer>)
Jim King79c99112015-04-30 07:10:08 -0400307{
308 pServer->getThreadManager()->threadFactory(
309 boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
310 new apache::thrift::concurrency::PlatformThreadFactory));
311 pServer->getThreadManager()->start();
312
313 // thread factory has 4 threads as a default
314 // thread factory however is a bad way to limit concurrent clients
315 // as accept() will be called to grab a 5th client socket, in this case
316 // and then the thread factory will block adding the thread to manage
317 // that client.
318 baseline(10, 5);
319}
320
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200321BOOST_FIXTURE_TEST_CASE(test_threadpool, TServerIntegrationProcessorTestFixture<TThreadPoolServer>)
322{
323 pServer->getThreadManager()->threadFactory(
324 boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
325 new apache::thrift::concurrency::PlatformThreadFactory));
326 pServer->getThreadManager()->start();
327
328 // thread factory has 4 threads as a default
329 // thread factory however is a bad way to limit concurrent clients
330 // as accept() will be called to grab a 5th client socket, in this case
331 // and then the thread factory will block adding the thread to manage
332 // that client.
333 baseline(10, 5);
334}
335
336BOOST_FIXTURE_TEST_CASE(test_threadpool_bound, TServerIntegrationProcessorTestFixture<TThreadPoolServer>)
Jim King79c99112015-04-30 07:10:08 -0400337{
338 pServer->getThreadManager()->threadFactory(
339 boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
340 new apache::thrift::concurrency::PlatformThreadFactory));
341 pServer->getThreadManager()->start();
342 pServer->setConcurrentClientLimit(4);
343
344 baseline(10, 4);
345}
346
347BOOST_AUTO_TEST_SUITE_END()
348
349
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200350BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationProcessorTestFixture<TThreadedServer> )
Jim King79c99112015-04-30 07:10:08 -0400351
Ben Craig1684c422015-04-24 08:52:44 -0500352BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
353{
354 // This tests THRIFT-2441 new behavior: stopping the server disconnects clients
355
356 startServer();
357
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200358 boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()), autoSocketCloser);
Ben Craig1684c422015-04-24 08:52:44 -0500359 pClientSock1->open();
360
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200361 boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()), autoSocketCloser);
Ben Craig1684c422015-04-24 08:52:44 -0500362 pClientSock2->open();
363
364 // Ensure they have been accepted
365 blockUntilAccepted(2);
366
367 // The test fixture destructor will force the sockets to disconnect
368 // Prior to THRIFT-2441, pServer->stop() would hang until clients disconnected
369 stopServer();
370
371 // extra proof the server end disconnected the clients
372 uint8_t buf[1];
373 BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected
374 BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected
Ben Craig1684c422015-04-24 08:52:44 -0500375}
376
377BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
378{
379 // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
380 // disconnect.
381
382 boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())->
383 setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
Jim King79c99112015-04-30 07:10:08 -0400384
Ben Craig1684c422015-04-24 08:52:44 -0500385 startServer();
386
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200387 boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()), autoSocketCloser);
Ben Craig1684c422015-04-24 08:52:44 -0500388 pClientSock1->open();
389
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200390 boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()), autoSocketCloser);
Ben Craig1684c422015-04-24 08:52:44 -0500391 pClientSock2->open();
392
393 // Ensure they have been accepted
394 blockUntilAccepted(2);
395
Jim King79c99112015-04-30 07:10:08 -0400396 boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1, milliseconds(250)));
397 boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
Ben Craig1684c422015-04-24 08:52:44 -0500398
399 // Once the clients disconnect the server will stop
400 stopServer();
Jim King79c99112015-04-30 07:10:08 -0400401 t1.join();
402 t2.join();
Ben Craig1684c422015-04-24 08:52:44 -0500403}
Jim King79c99112015-04-30 07:10:08 -0400404
405BOOST_AUTO_TEST_CASE(test_concurrent_client_limit)
406{
407 startServer();
408
409 BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit());
410 pServer->setConcurrentClientLimit(2);
411 BOOST_CHECK_EQUAL(0, pServer->getConcurrentClientCount());
412 BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientLimit());
413
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200414 boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()), autoSocketCloser);
Jim King79c99112015-04-30 07:10:08 -0400415 pClientSock1->open();
416 blockUntilAccepted(1);
417 BOOST_CHECK_EQUAL(1, pServer->getConcurrentClientCount());
418
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200419 boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()), autoSocketCloser);
Jim King79c99112015-04-30 07:10:08 -0400420 pClientSock2->open();
421 blockUntilAccepted(2);
422 BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
423
424 // a third client cannot connect until one of the other two closes
425 boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
Konrad Grochowski24ea0bf2015-05-07 14:59:29 +0200426 boost::shared_ptr<TSocket> pClientSock3(new TSocket("localhost", getServerPort()), autoSocketCloser);
Jim King79c99112015-04-30 07:10:08 -0400427 pClientSock2->open();
428 blockUntilAccepted(2);
429 BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
430 BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM());
431
432 stopServer();
433 t2.join();
434}
435
Ben Craig1684c422015-04-24 08:52:44 -0500436BOOST_AUTO_TEST_SUITE_END()