blob: e933d6be347d9fcdb42666e193862000ea9c0b04 [file] [log] [blame]
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +09001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#define BOOST_TEST_MODULE TNonblockingServerTest
21#include <boost/test/unit_test.hpp>
22#include <boost/smart_ptr.hpp>
23
Buğra Gedik36d1b0d2016-09-04 17:18:15 +090024#include "thrift/concurrency/Monitor.h"
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +090025#include "thrift/concurrency/Thread.h"
26#include "thrift/server/TNonblockingServer.h"
27
28#include "gen-cpp/ParentService.h"
29
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +090030#include <event.h>
31
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +090032using namespace apache::thrift;
Buğra Gedik36d1b0d2016-09-04 17:18:15 +090033using apache::thrift::concurrency::Guard;
34using apache::thrift::concurrency::Monitor;
35using apache::thrift::concurrency::Mutex;
36using apache::thrift::server::TServerEventHandler;
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +090037
38struct Handler : public test::ParentServiceIf {
39 void addString(const std::string& s) { strings_.push_back(s); }
40 void getStrings(std::vector<std::string>& _return) { _return = strings_; }
41 std::vector<std::string> strings_;
42
43 // dummy overrides not used in this test
44 int32_t incrementGeneration() { return 0; }
45 int32_t getGeneration() { return 0; }
ben-craigfae08e72015-07-15 11:34:47 -050046 void getDataWait(std::string&, const int32_t) {}
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +090047 void onewayWait() {}
48 void exceptionWait(const std::string&) {}
49 void unexpectedExceptionWait(const std::string&) {}
50};
51
52class Fixture {
53private:
Buğra Gedik36d1b0d2016-09-04 17:18:15 +090054 struct ListenEventHandler : public TServerEventHandler {
55 public:
56 ListenEventHandler(Mutex* mutex) : listenMonitor_(mutex), ready_(false) {}
57
58 void preServe() /* override */ {
59 Guard g(listenMonitor_.mutex());
60 ready_ = true;
61 listenMonitor_.notify();
62 }
63
64 Monitor listenMonitor_;
65 bool ready_;
66 };
67
Ben Craig7207c222015-07-06 08:40:35 -050068 struct Runner : public apache::thrift::concurrency::Runnable {
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +090069 int port;
70 boost::shared_ptr<event_base> userEventBase;
71 boost::shared_ptr<TProcessor> processor;
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +090072 boost::shared_ptr<server::TNonblockingServer> server;
Buğra Gedik36d1b0d2016-09-04 17:18:15 +090073 boost::shared_ptr<ListenEventHandler> listenHandler;
74 Mutex mutex_;
75
76 Runner() {
77 listenHandler.reset(new ListenEventHandler(&mutex_));
78 }
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +090079
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +090080 virtual void run() {
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +090081 // When binding to explicit port, allow retrying to workaround bind failures on ports in use
82 int retryCount = port ? 10 : 0;
83 startServer(retryCount);
84 }
85
Buğra Gedik36d1b0d2016-09-04 17:18:15 +090086 void readyBarrier() {
87 // block until server is listening and ready to accept connections
88 Guard g(mutex_);
89 while (!listenHandler->ready_) {
90 listenHandler->listenMonitor_.wait();
91 }
92 }
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +090093 private:
94 void startServer(int retry_count) {
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +090095 try {
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +090096 server.reset(new server::TNonblockingServer(processor, port));
Buğra Gedik36d1b0d2016-09-04 17:18:15 +090097 server->setServerEventHandler(listenHandler);
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +090098 if (userEventBase) {
99 server->registerEvents(userEventBase.get());
100 }
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +0900101 server->serve();
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +0900102 } catch (const transport::TTransportException&) {
103 if (retry_count > 0) {
104 ++port;
105 startServer(retry_count - 1);
106 } else {
107 throw;
108 }
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +0900109 }
110 }
111 };
112
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +0900113 struct EventDeleter {
114 void operator()(event_base* p) { event_base_free(p); }
115 };
116
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +0900117protected:
118 Fixture() : processor(new test::ParentServiceProcessor(boost::make_shared<Handler>())) {}
119
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +0900120 ~Fixture() {
121 if (server) {
122 server->stop();
123 }
124 if (thread) {
125 thread->join();
126 }
127 }
128
129 void setEventBase(event_base* user_event_base) {
130 userEventBase_.reset(user_event_base, EventDeleter());
131 }
132
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +0900133 int startServer(int port) {
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +0900134 boost::shared_ptr<Runner> runner(new Runner);
135 runner->port = port;
136 runner->processor = processor;
137 runner->userEventBase = userEventBase_;
138
Ben Craig7207c222015-07-06 08:40:35 -0500139 boost::scoped_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory(
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +0900140 new apache::thrift::concurrency::PlatformThreadFactory(
Nobuaki Sukegawa28256642014-12-16 03:24:37 +0900141#if !USE_BOOST_THREAD && !USE_STD_THREAD
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +0900142 concurrency::PlatformThreadFactory::OTHER, concurrency::PlatformThreadFactory::NORMAL,
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +0900143 1,
144#endif
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +0900145 false));
146 thread = threadFactory->newThread(runner);
147 thread->start();
Buğra Gedik36d1b0d2016-09-04 17:18:15 +0900148 runner->readyBarrier();
149
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +0900150 server = runner->server;
151 return runner->port;
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +0900152 }
153
154 bool canCommunicate(int serverPort) {
155 boost::shared_ptr<transport::TSocket> socket(new transport::TSocket("localhost", serverPort));
156 socket->open();
157 test::ParentServiceClient client(boost::make_shared<protocol::TBinaryProtocol>(
158 boost::make_shared<transport::TFramedTransport>(socket)));
159 client.addString("foo");
160 std::vector<std::string> strings;
161 client.getStrings(strings);
162 return strings.size() == 1 && !(strings[0].compare("foo"));
163 }
164
165private:
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +0900166 boost::shared_ptr<event_base> userEventBase_;
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +0900167 boost::shared_ptr<test::ParentServiceProcessor> processor;
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +0900168protected:
169 boost::shared_ptr<server::TNonblockingServer> server;
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +0900170private:
171 boost::shared_ptr<apache::thrift::concurrency::Thread> thread;
172
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +0900173};
174
175BOOST_AUTO_TEST_SUITE(TNonblockingServerTest)
176
177BOOST_FIXTURE_TEST_CASE(get_specified_port, Fixture) {
178 int specified_port = startServer(12345);
179 BOOST_REQUIRE_GE(specified_port, 12345);
180 BOOST_REQUIRE_EQUAL(server->getListenPort(), specified_port);
181 BOOST_CHECK(canCommunicate(specified_port));
182
183 server->stop();
184 BOOST_CHECK_EQUAL(server->getListenPort(), specified_port);
185}
186
187BOOST_FIXTURE_TEST_CASE(get_assigned_port, Fixture) {
188 int specified_port = startServer(0);
189 BOOST_REQUIRE_EQUAL(specified_port, 0);
190 int assigned_port = server->getListenPort();
191 BOOST_REQUIRE_NE(assigned_port, 0);
192 BOOST_CHECK(canCommunicate(assigned_port));
193
194 server->stop();
195 BOOST_CHECK_EQUAL(server->getListenPort(), 0);
196}
197
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +0900198BOOST_FIXTURE_TEST_CASE(provide_event_base, Fixture) {
199 event_base* eb = event_base_new();
200 setEventBase(eb);
201 startServer(0);
202
203 // assert that the server works
204 BOOST_CHECK(canCommunicate(server->getListenPort()));
205#if LIBEVENT_VERSION_NUMBER > 0x02010400
206 // also assert that the event_base is actually used when it's easy
207 BOOST_CHECK_GT(event_base_get_num_events(eb, EVENT_BASE_COUNT_ADDED), 0);
208#endif
209}
210
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +0900211BOOST_AUTO_TEST_SUITE_END()