blob: 9edeb19944f230b9a44c36b591fa55e4b56bf542 [file] [log] [blame]
Ben Craig1684c422015-04-24 08:52:44 -05001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#define BOOST_TEST_MODULE TServerIntegrationTest
21#include <boost/test/auto_unit_test.hpp>
22#include <boost/bind.hpp>
23#include <boost/format.hpp>
24#include <boost/shared_ptr.hpp>
25#include <boost/thread.hpp>
26#include <thrift/server/TThreadedServer.h>
27#include <thrift/protocol/TBinaryProtocol.h>
28#include <thrift/transport/TServerSocket.h>
29#include <thrift/transport/TSocket.h>
30#include <thrift/transport/TTransport.h>
31#include "gen-cpp/ParentService.h"
32#include "TestPortFixture.h"
33#include <vector>
34
35using apache::thrift::concurrency::Guard;
36using apache::thrift::concurrency::Monitor;
37using apache::thrift::concurrency::Mutex;
38using apache::thrift::concurrency::Synchronized;
39using apache::thrift::protocol::TBinaryProtocol;
40using apache::thrift::protocol::TBinaryProtocolFactory;
41using apache::thrift::protocol::TProtocol;
42using apache::thrift::protocol::TProtocolFactory;
43using apache::thrift::transport::TServerSocket;
44using apache::thrift::transport::TServerTransport;
45using apache::thrift::transport::TSocket;
46using apache::thrift::transport::TTransport;
47using apache::thrift::transport::TTransportFactory;
48using apache::thrift::server::TServerEventHandler;
49using apache::thrift::server::TThreadedServer;
50using apache::thrift::test::ParentServiceClient;
51using apache::thrift::test::ParentServiceIf;
52using apache::thrift::test::ParentServiceProcessor;
53
54/**
55 * preServe runs after listen() is successful, when we can connect
56 */
57class TServerReadyEventHandler : public TServerEventHandler, public Monitor
58{
59public:
60 TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
61 virtual ~TServerReadyEventHandler() {}
62 virtual void preServe() {
63 Synchronized sync(*this);
64 isListening_ = true;
65 notify();
66 }
67 virtual void* createContext(boost::shared_ptr<TProtocol> input,
68 boost::shared_ptr<TProtocol> output) {
69 Synchronized sync(*this);
70 ++accepted_;
71 notify();
72
73 (void)input;
74 (void)output;
75 return NULL;
76 }
77 bool isListening() const { return isListening_; }
78 uint64_t acceptedCount() const { return accepted_; }
79private:
80 bool isListening_;
81 uint64_t accepted_;
82};
83
84class ParentHandler : virtual public ParentServiceIf {
85public:
86 ParentHandler() : generation_(0) {}
87
88 int32_t incrementGeneration() {
89 Guard g(mutex_);
90 return ++generation_;
91 }
92
93 int32_t getGeneration() {
94 Guard g(mutex_);
95 return generation_;
96 }
97
98 void addString(const std::string& s) {
99 Guard g(mutex_);
100 strings_.push_back(s);
101 }
102
103 void getStrings(std::vector<std::string>& _return) {
104 Guard g(mutex_);
105 _return = strings_;
106 }
107
108 void getDataWait(std::string& _return, int32_t length) {
109 }
110
111 void onewayWait() {
112 }
113
114 void exceptionWait(const std::string& message) {
115 }
116
117 void unexpectedExceptionWait(const std::string& message) {
118 }
119
120protected:
121 Mutex mutex_;
122 int32_t generation_;
123 std::vector<std::string> strings_;
124};
125
126class TServerIntegrationTestFixture : public TestPortFixture
127{
128public:
129 TServerIntegrationTestFixture() :
130 pServer(new TThreadedServer(
131 boost::shared_ptr<ParentServiceProcessor>(new ParentServiceProcessor(
132 boost::shared_ptr<ParentServiceIf>(new ParentHandler))),
133 boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)),
134 boost::shared_ptr<TTransportFactory>(new TTransportFactory),
135 boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
136 pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler))
137 {
138 pServer->setServerEventHandler(pEventHandler);
139 }
140
141 void startServer() {
142 pServerThread.reset(new boost::thread(boost::bind(&TThreadedServer::serve, pServer.get())));
143
144 // block until listen() completes so clients will be able to connect
145 Synchronized sync(*(pEventHandler.get()));
146 while (!pEventHandler->isListening()) {
147 pEventHandler->wait();
148 }
149
150 BOOST_MESSAGE("server is listening");
151 }
152
153 void blockUntilAccepted(uint64_t numAccepted) {
154 Synchronized sync(*(pEventHandler.get()));
155 while (pEventHandler->acceptedCount() < numAccepted) {
156 pEventHandler->wait();
157 }
158
159 BOOST_MESSAGE(boost::format("server has accepted %1%") % numAccepted);
160 }
161
162 void stopServer() {
163 pServer->stop();
164 BOOST_MESSAGE("server stop completed");
165 pServerThread->join();
166 BOOST_MESSAGE("server thread joined");
167 }
168
169 ~TServerIntegrationTestFixture() {
170 stopServer();
171 }
172
173 void delayClose(boost::shared_ptr<TTransport> toClose) {
174 boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
175 toClose->close();
176 }
177
178 boost::shared_ptr<TThreadedServer> pServer;
179 boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
180 boost::shared_ptr<boost::thread> pServerThread;
181};
182
183BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture )
184
185BOOST_AUTO_TEST_CASE(test_execute_one_request_and_close)
186{
187 // this test establishes some basic sanity
188
189 startServer();
190 boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
191 boost::shared_ptr<TProtocol> pClientProtocol1(new TBinaryProtocol(pClientSock1));
192 ParentServiceClient client1(pClientProtocol1);
193 pClientSock1->open();
194 client1.incrementGeneration();
195 pClientSock1->close();
196 stopServer();
197}
198
199BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
200{
201 // This tests THRIFT-2441 new behavior: stopping the server disconnects clients
202
203 startServer();
204
205 boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
206 pClientSock1->open();
207
208 boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
209 pClientSock2->open();
210
211 // Ensure they have been accepted
212 blockUntilAccepted(2);
213
214 // The test fixture destructor will force the sockets to disconnect
215 // Prior to THRIFT-2441, pServer->stop() would hang until clients disconnected
216 stopServer();
217
218 // extra proof the server end disconnected the clients
219 uint8_t buf[1];
220 BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected
221 BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected
222 pClientSock1->close();
223 pClientSock2->close();
224}
225
226BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
227{
228 // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
229 // disconnect.
230
231 boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())->
232 setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
233 startServer();
234
235 boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
236 pClientSock1->open();
237
238 boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
239 pClientSock2->open();
240
241 // Ensure they have been accepted
242 blockUntilAccepted(2);
243
244 boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1));
245 boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2));
246
247 // Once the clients disconnect the server will stop
248 stopServer();
249
250 pClientSock1->close();
251 pClientSock2->close();
252}
253BOOST_AUTO_TEST_SUITE_END()