blob: 649910f2f06e3af2786458ee26dba90b174e6d1c [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Konrad Grochowski9be4e682013-06-22 22:03:31 +020020#include <thrift/thrift-config.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000021
Roger Meier4285ba22013-06-10 21:17:23 +020022#include <thrift/server/TNonblockingServer.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000023#include <thrift/concurrency/Exception.h>
24#include <thrift/transport/TSocket.h>
25#include <thrift/concurrency/PlatformThreadFactory.h>
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040026#include <thrift/transport/PlatformSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000027
Mark Sleee02385b2007-06-09 01:21:16 +000028#include <iostream>
Lei Feiweib5ebcd12015-04-04 22:12:07 +080029
30#ifdef HAVE_SYS_SELECT_H
31#include <sys/select.h>
32#endif
Roger Meier30aae0c2011-07-08 12:23:31 +000033
34#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000035#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000036#endif
37
38#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000039#include <netinet/in.h>
40#include <netinet/tcp.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000041#endif
42
43#ifdef HAVE_ARPA_INET_H
Bryan Duxbury76c43682011-08-24 21:26:48 +000044#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000045#endif
46
47#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000048#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000049#endif
50
Roger Meier2fa9c312011-09-05 19:15:53 +000051#ifdef HAVE_FCNTL_H
Mark Slee2f6404d2006-10-10 01:37:40 +000052#include <fcntl.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000053#endif
54
Mark Slee2f6404d2006-10-10 01:37:40 +000055#include <assert.h>
Roger Meier12d70532011-12-14 23:35:28 +000056
57#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +000058#include <sched.h>
Roger Meier12d70532011-12-14 23:35:28 +000059#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000060
David Reiss9b903442009-10-21 05:51:28 +000061#ifndef AF_LOCAL
62#define AF_LOCAL AF_UNIX
63#endif
64
James E. King, III7edc8fa2017-01-20 10:11:41 -050065#ifdef HAVE_INTTYPES_H
66#include <inttypes.h>
Roger Meier12d70532011-12-14 23:35:28 +000067#endif
68
James E. King, III7edc8fa2017-01-20 10:11:41 -050069#ifdef HAVE_STDINT_H
70#include <stdint.h>
Antonio Di Monaco796667b2016-01-04 23:05:19 +010071#endif
72
Konrad Grochowski16a23a62014-11-13 15:33:38 +010073namespace apache {
74namespace thrift {
75namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000076
T Jake Lucianib5e62212009-01-31 22:36:20 +000077using namespace apache::thrift::protocol;
78using namespace apache::thrift::transport;
79using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000080using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000081using apache::thrift::transport::TSocket;
82using apache::thrift::transport::TTransportException;
Jake Farrellb0d95602011-12-06 01:17:26 +000083using boost::shared_ptr;
Mark Sleee02385b2007-06-09 01:21:16 +000084
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000085/// Three states for sockets: recv frame size, recv data, and send mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +010086enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000087
88/**
89 * Five states for the nonblocking server:
90 * 1) initialize
91 * 2) read 4 byte frame size
92 * 3) read frame of data
93 * 4) send back data (if any)
94 * 5) force immediate connection close
95 */
96enum TAppState {
97 APP_INIT,
98 APP_READ_FRAME_SIZE,
99 APP_READ_REQUEST,
100 APP_WAIT_TASK,
101 APP_SEND_RESULT,
102 APP_CLOSE_CONNECTION
103};
104
105/**
106 * Represents a connection that is handled via libevent. This connection
107 * essentially encapsulates a socket that has some associated libevent state.
108 */
109class TNonblockingServer::TConnection {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100110private:
Jake Farrellb0d95602011-12-06 01:17:26 +0000111 /// Server IO Thread handling this connection
112 TNonblockingIOThread* ioThread_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000113
114 /// Server handle
115 TNonblockingServer* server_;
116
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000117 /// TProcessor
118 boost::shared_ptr<TProcessor> processor_;
119
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000120 /// Object wrapping network socket
121 boost::shared_ptr<TSocket> tSocket_;
122
123 /// Libevent object
124 struct event event_;
125
126 /// Libevent flags
127 short eventFlags_;
128
129 /// Socket mode
130 TSocketState socketState_;
131
132 /// Application state
133 TAppState appState_;
134
135 /// How much data needed to read
136 uint32_t readWant_;
137
138 /// Where in the read buffer are we
139 uint32_t readBufferPos_;
140
141 /// Read buffer
142 uint8_t* readBuffer_;
143
144 /// Read buffer size
145 uint32_t readBufferSize_;
146
147 /// Write buffer
148 uint8_t* writeBuffer_;
149
150 /// Write buffer size
151 uint32_t writeBufferSize_;
152
153 /// How far through writing are we?
154 uint32_t writeBufferPos_;
155
156 /// Largest size of write buffer seen since buffer was constructed
157 size_t largestWriteBufferSize_;
158
159 /// Count of the number of calls for use with getResizeBufferEveryN().
160 int32_t callsForResize_;
161
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000162 /// Transport to read from
163 boost::shared_ptr<TMemoryBuffer> inputTransport_;
164
165 /// Transport that processor writes to
166 boost::shared_ptr<TMemoryBuffer> outputTransport_;
167
168 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
169 boost::shared_ptr<TTransport> factoryInputTransport_;
170 boost::shared_ptr<TTransport> factoryOutputTransport_;
171
172 /// Protocol decoder
173 boost::shared_ptr<TProtocol> inputProtocol_;
174
175 /// Protocol encoder
176 boost::shared_ptr<TProtocol> outputProtocol_;
177
178 /// Server event handler, if any
179 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
180
181 /// Thrift call context, if any
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100182 void* connectionContext_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000183
184 /// Go into read mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100185 void setRead() { setFlags(EV_READ | EV_PERSIST); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000186
187 /// Go into write mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100188 void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000189
190 /// Set socket idle
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100191 void setIdle() { setFlags(0); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000192
193 /**
194 * Set event flags for this connection.
195 *
196 * @param eventFlags flags we pass to libevent for the connection.
197 */
198 void setFlags(short eventFlags);
199
200 /**
201 * Libevent handler called (via our static wrapper) when the connection
202 * socket had something happen. Rather than use the flags libevent passed,
203 * we use the connection state to determine whether we need to read or
204 * write the socket.
205 */
206 void workSocket();
207
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100208public:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000209 class Task;
210
211 /// Constructor
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100212 TConnection(THRIFT_SOCKET socket,
213 TNonblockingIOThread* ioThread,
214 const sockaddr* addr,
215 socklen_t addrLen) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000216 readBuffer_ = NULL;
217 readBufferSize_ = 0;
218
Jake Farrellb0d95602011-12-06 01:17:26 +0000219 ioThread_ = ioThread;
220 server_ = ioThread->getServer();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000221
Jake Farrellb0d95602011-12-06 01:17:26 +0000222 // Allocate input and output transports these only need to be allocated
223 // once per TConnection (they don't need to be reallocated on init() call)
224 inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400225 outputTransport_.reset(
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100226 new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
Jake Farrellb0d95602011-12-06 01:17:26 +0000227 tSocket_.reset(new TSocket());
228 init(socket, ioThread, addr, addrLen);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000229 }
230
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100231 ~TConnection() { std::free(readBuffer_); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000232
Roger Meier0c04fcc2013-03-22 19:52:08 +0100233 /// Close this connection and free or reset its resources.
234 void close();
235
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100236 /**
237 * Check buffers against any size limits and shrink it if exceeded.
238 *
239 * @param readLimit we reduce read buffer size to this (if nonzero).
240 * @param writeLimit if nonzero and write buffer is larger, replace it.
241 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000242 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
243
244 /// Initialize
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100245 void init(THRIFT_SOCKET socket,
246 TNonblockingIOThread* ioThread,
247 const sockaddr* addr,
248 socklen_t addrLen);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000249
250 /**
251 * This is called when the application transitions from one state into
252 * another. This means that it has finished writing the data that it needed
253 * to, or finished receiving the data that it needed to.
254 */
255 void transition();
256
257 /**
258 * C-callable event handler for connection events. Provides a callback
259 * that libevent can understand which invokes connection_->workSocket().
260 *
261 * @param fd the descriptor the event occurred on.
262 * @param which the flags associated with the event.
263 * @param v void* callback arg where we placed TConnection's "this".
264 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000265 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Konrad Grochowskib7af66e2014-07-08 19:22:44 +0200266 assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000267 ((TConnection*)v)->workSocket();
268 }
269
270 /**
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000271 * Notification to server that processing has ended on this request.
272 * Can be called either when processing is completed or when a waiting
273 * task has been preemptively terminated (on overload).
274 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000275 * Don't call this from the IO thread itself.
276 *
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400277 * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000278 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100279 bool notifyIOThread() { return ioThread_->notify(this); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000280
Jake Farrellb0d95602011-12-06 01:17:26 +0000281 /*
282 * Returns the number of this connection's currently assigned IO
283 * thread.
284 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100285 int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000286
287 /// Force connection shutdown for this connection.
288 void forceClose() {
289 appState_ = APP_CLOSE_CONNECTION;
Jake Farrellb0d95602011-12-06 01:17:26 +0000290 if (!notifyIOThread()) {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100291 close();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000292 throw TException("TConnection::forceClose: failed write on notify pipe");
293 }
294 }
295
296 /// return the server this connection was initialized for.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100297 TNonblockingServer* getServer() const { return server_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000298
299 /// get state of connection.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100300 TAppState getState() const { return appState_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000301
302 /// return the TSocket transport wrapping this network connection
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100303 boost::shared_ptr<TSocket> getTSocket() const { return tSocket_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000304
305 /// return the server event handler if any
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100306 boost::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000307
308 /// return the Thrift connection context if any
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100309 void* getConnectionContext() { return connectionContext_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000310};
311
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100312class TNonblockingServer::TConnection::Task : public Runnable {
313public:
Mark Sleee02385b2007-06-09 01:21:16 +0000314 Task(boost::shared_ptr<TProcessor> processor,
315 boost::shared_ptr<TProtocol> input,
316 boost::shared_ptr<TProtocol> output,
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100317 TConnection* connection)
318 : processor_(processor),
319 input_(input),
320 output_(output),
321 connection_(connection),
322 serverEventHandler_(connection_->getServerEventHandler()),
323 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000324
325 void run() {
326 try {
David Reiss105961d2010-10-06 17:10:17 +0000327 for (;;) {
Roger Meier72957452013-06-29 00:28:50 +0200328 if (serverEventHandler_) {
David Reiss105961d2010-10-06 17:10:17 +0000329 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
330 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100331 if (!processor_->process(input_, output_, connectionContext_)
332 || !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000333 break;
334 }
335 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000336 } catch (const TTransportException& ttx) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000337 GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
Bryan Duxbury1e987582011-08-25 17:33:03 +0000338 } catch (const bad_alloc&) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000339 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
Henrique Mendonca962b3532012-09-20 13:19:55 +0000340 exit(1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000341 } catch (const std::exception& x) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000342 GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100343 typeid(x).name(),
344 x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000345 } catch (...) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100346 GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
Mark Sleee02385b2007-06-09 01:21:16 +0000347 }
Mark Slee79b16942007-11-26 19:05:29 +0000348
David Reiss01fe1532010-03-09 05:19:25 +0000349 // Signal completion back to the libevent thread via a pipe
Jake Farrellb0d95602011-12-06 01:17:26 +0000350 if (!connection_->notifyIOThread()) {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100351 GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
352 connection_->close();
David Reiss01fe1532010-03-09 05:19:25 +0000353 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000354 }
David Reiss01fe1532010-03-09 05:19:25 +0000355 }
356
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100357 TConnection* getTConnection() { return connection_; }
Mark Sleee02385b2007-06-09 01:21:16 +0000358
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100359private:
Mark Sleee02385b2007-06-09 01:21:16 +0000360 boost::shared_ptr<TProcessor> processor_;
361 boost::shared_ptr<TProtocol> input_;
362 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000363 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000364 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
365 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000366};
Mark Slee5ea15f92007-03-05 22:55:59 +0000367
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400368void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
Jake Farrellb0d95602011-12-06 01:17:26 +0000369 TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000370 const sockaddr* addr,
371 socklen_t addrLen) {
David Reiss105961d2010-10-06 17:10:17 +0000372 tSocket_->setSocketFD(socket);
373 tSocket_->setCachedAddress(addr, addrLen);
374
Jake Farrellb0d95602011-12-06 01:17:26 +0000375 ioThread_ = ioThread;
376 server_ = ioThread->getServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000377 appState_ = APP_INIT;
378 eventFlags_ = 0;
379
380 readBufferPos_ = 0;
381 readWant_ = 0;
382
383 writeBuffer_ = NULL;
384 writeBufferSize_ = 0;
385 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000386 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000387
David Reiss89a12942010-10-06 17:10:52 +0000388 socketState_ = SOCKET_RECV_FRAMING;
David Reiss54bec5d2010-10-06 17:10:45 +0000389 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000390
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000391 // get input/transports
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100392 factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
393 factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000394
395 // Create protocol
Dave Watson792db4e2015-01-16 11:22:01 -0800396 if (server_->getHeaderTransport()) {
397 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_,
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +0100398 factoryOutputTransport_);
Dave Watson792db4e2015-01-16 11:22:01 -0800399 outputProtocol_ = inputProtocol_;
400 } else {
401 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
402 outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
403 }
David Reiss105961d2010-10-06 17:10:17 +0000404
405 // Set up for any server event handler
406 serverEventHandler_ = server_->getEventHandler();
Roger Meier72957452013-06-29 00:28:50 +0200407 if (serverEventHandler_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100408 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
David Reiss105961d2010-10-06 17:10:17 +0000409 } else {
410 connectionContext_ = NULL;
411 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000412
413 // Get the processor
Jake Farrellb0d95602011-12-06 01:17:26 +0000414 processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000415}
416
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000417void TNonblockingServer::TConnection::workSocket() {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100418 int got = 0, left = 0, sent = 0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000419 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000420
421 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000422 case SOCKET_RECV_FRAMING:
423 union {
424 uint8_t buf[sizeof(uint32_t)];
Roger Meier3781c242011-12-11 20:07:21 +0000425 uint32_t size;
David Reiss89a12942010-10-06 17:10:52 +0000426 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000427
David Reiss89a12942010-10-06 17:10:52 +0000428 // if we've already received some bytes we kept them here
429 framing.size = readWant_;
430 // determine size of this frame
431 try {
432 // Read from the socket
433 fetch = tSocket_->read(&framing.buf[readBufferPos_],
434 uint32_t(sizeof(framing.size) - readBufferPos_));
435 if (fetch == 0) {
436 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000437 close();
438 return;
439 }
David Reiss89a12942010-10-06 17:10:52 +0000440 readBufferPos_ += fetch;
441 } catch (TTransportException& te) {
442 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
443 close();
444
445 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000446 }
447
David Reiss89a12942010-10-06 17:10:52 +0000448 if (readBufferPos_ < sizeof(framing.size)) {
449 // more needed before frame size is known -- save what we have so far
450 readWant_ = framing.size;
451 return;
452 }
453
454 readWant_ = ntohl(framing.size);
Roger Meier3781c242011-12-11 20:07:21 +0000455 if (readWant_ > server_->getMaxFrameSize()) {
456 // Don't allow giant frame sizes. This prevents bad clients from
457 // causing us to try and allocate a giant buffer.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100458 GlobalOutput.printf(
459 "TNonblockingServer: frame size too large "
460 "(%" PRIu32 " > %" PRIu64
461 ") from client %s. "
462 "Remote side not using TFramedTransport?",
463 readWant_,
464 (uint64_t)server_->getMaxFrameSize(),
465 tSocket_->getSocketInfo().c_str());
David Reiss89a12942010-10-06 17:10:52 +0000466 close();
467 return;
468 }
469 // size known; now get the rest of the frame
470 transition();
471 return;
472
473 case SOCKET_RECV:
474 // It is an error to be in this state if we already have all the data
475 assert(readBufferPos_ < readWant_);
476
David Reiss105961d2010-10-06 17:10:17 +0000477 try {
478 // Read from the socket
479 fetch = readWant_ - readBufferPos_;
480 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100481 } catch (TTransportException& te) {
David Reiss105961d2010-10-06 17:10:17 +0000482 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
483 close();
Mark Slee79b16942007-11-26 19:05:29 +0000484
David Reiss105961d2010-10-06 17:10:17 +0000485 return;
486 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000487
Mark Slee2f6404d2006-10-10 01:37:40 +0000488 if (got > 0) {
489 // Move along in the buffer
490 readBufferPos_ += got;
491
492 // Check that we did not overdo it
493 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000494
Mark Slee2f6404d2006-10-10 01:37:40 +0000495 // We are done reading, move onto the next state
496 if (readBufferPos_ == readWant_) {
497 transition();
498 }
499 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000500 }
501
502 // Whenever we get down here it means a remote disconnect
503 close();
Mark Slee79b16942007-11-26 19:05:29 +0000504
Mark Slee2f6404d2006-10-10 01:37:40 +0000505 return;
506
507 case SOCKET_SEND:
508 // Should never have position past size
509 assert(writeBufferPos_ <= writeBufferSize_);
510
511 // If there is no data to send, then let us move on
512 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000513 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000514 transition();
515 return;
516 }
517
David Reiss105961d2010-10-06 17:10:17 +0000518 try {
519 left = writeBufferSize_ - writeBufferPos_;
520 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100521 } catch (TTransportException& te) {
David Reiss105961d2010-10-06 17:10:17 +0000522 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000523 close();
524 return;
525 }
526
527 writeBufferPos_ += sent;
528
529 // Did we overdo it?
530 assert(writeBufferPos_ <= writeBufferSize_);
531
Mark Slee79b16942007-11-26 19:05:29 +0000532 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000533 if (writeBufferPos_ == writeBufferSize_) {
534 transition();
535 }
536
537 return;
538
539 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000540 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000541 assert(0);
542 }
543}
544
Dave Watson792db4e2015-01-16 11:22:01 -0800545bool TNonblockingServer::getHeaderTransport() {
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +0100546 // Currently if there is no output protocol factory,
Dave Watson792db4e2015-01-16 11:22:01 -0800547 // we assume header transport (without having to create
548 // a new transport and check)
549 return getOutputProtocolFactory() == NULL;
550}
551
Mark Slee2f6404d2006-10-10 01:37:40 +0000552/**
553 * This is called when the application transitions from one state into
554 * another. This means that it has finished writing the data that it needed
555 * to, or finished receiving the data that it needed to.
556 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000557void TNonblockingServer::TConnection::transition() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000558 // ensure this connection is active right now
559 assert(ioThread_);
560 assert(server_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000561
Mark Slee2f6404d2006-10-10 01:37:40 +0000562 // Switch upon the state that we are currently in and move to a new state
563 switch (appState_) {
564
565 case APP_READ_REQUEST:
566 // We are done reading the request, package the read buffer into transport
567 // and get back some data from the dispatch function
Dave Watson792db4e2015-01-16 11:22:01 -0800568 if (server_->getHeaderTransport()) {
569 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
570 outputTransport_->resetBuffer();
571 } else {
572 // We saved room for the framing size in case header transport needed it,
573 // but just skip it for the non-header case
574 inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
575 outputTransport_->resetBuffer();
576
577 // Prepend four bytes of blank space to the buffer so we can
578 // write the frame size there later.
579 outputTransport_->getWritePtr(4);
580 outputTransport_->wroteBytes(4);
581 }
Mark Slee79b16942007-11-26 19:05:29 +0000582
David Reiss01fe1532010-03-09 05:19:25 +0000583 server_->incrementActiveProcessors();
584
Mark Sleee02385b2007-06-09 01:21:16 +0000585 if (server_->isThreadPoolProcessing()) {
586 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000587
David Reiss01fe1532010-03-09 05:19:25 +0000588 // Create task and dispatch to the thread manager
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100589 boost::shared_ptr<Runnable> task = boost::shared_ptr<Runnable>(
590 new Task(processor_, inputProtocol_, outputProtocol_, this));
David Reiss01fe1532010-03-09 05:19:25 +0000591 // The application is now waiting on the task to finish
592 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000593
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100594 try {
595 server_->addTask(task);
596 } catch (IllegalStateException& ise) {
597 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
598 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
599 close();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100600 } catch (TimedOutException& to) {
601 GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
602 close();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100603 }
Mark Slee402ee282007-08-23 01:43:20 +0000604
David Reiss01fe1532010-03-09 05:19:25 +0000605 // Set this connection idle so that libevent doesn't process more
606 // data on it while we're still waiting for the threadmanager to
607 // finish this task
608 setIdle();
609 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000610 } else {
611 try {
Roger Meier72957452013-06-29 00:28:50 +0200612 if (serverEventHandler_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100613 serverEventHandler_->processContext(connectionContext_, getTSocket());
Roger Meier72957452013-06-29 00:28:50 +0200614 }
Mark Sleee02385b2007-06-09 01:21:16 +0000615 // Invoke the processor
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100616 processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
617 } catch (const TTransportException& ttx) {
618 GlobalOutput.printf(
619 "TNonblockingServer transport error in "
620 "process(): %s",
621 ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000622 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000623 close();
624 return;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100625 } catch (const std::exception& x) {
Bryan Duxbury1e987582011-08-25 17:33:03 +0000626 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100627 typeid(x).name(),
628 x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000629 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000630 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000631 return;
632 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000633 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000634 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000635 close();
636 return;
637 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000638 }
639
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100640 // Intentionally fall through here, the call to process has written into
641 // the writeBuffer_
Mark Slee402ee282007-08-23 01:43:20 +0000642
Mark Sleee02385b2007-06-09 01:21:16 +0000643 case APP_WAIT_TASK:
644 // We have now finished processing a task and the result has been written
645 // into the outputTransport_, so we grab its contents and place them into
646 // the writeBuffer_ for actual writing by the libevent thread
647
David Reiss01fe1532010-03-09 05:19:25 +0000648 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000649 // Get the result of the operation
650 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
651
652 // If the function call generated return data, then move into the send
653 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000654 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000655 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000656
657 // Move into write state
658 writeBufferPos_ = 0;
659 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000660
David Reissaf787782008-07-03 20:29:34 +0000661 // Put the frame size into the write buffer
662 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
663 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000664
665 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000666 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000667 setWrite();
668
669 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000670 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000671
672 return;
673 }
674
David Reissc51986f2009-03-24 20:01:25 +0000675 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000676 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000677 goto LABEL_APP_INIT;
678
Mark Slee2f6404d2006-10-10 01:37:40 +0000679 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000680 // it's now safe to perform buffer size housekeeping.
681 if (writeBufferSize_ > largestWriteBufferSize_) {
682 largestWriteBufferSize_ = writeBufferSize_;
683 }
684 if (server_->getResizeBufferEveryN() > 0
685 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
686 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
687 server_->getIdleWriteBufferLimit());
688 callsForResize_ = 0;
689 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000690
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100691 // N.B.: We also intentionally fall through here into the INIT state!
Mark Slee2f6404d2006-10-10 01:37:40 +0000692
Mark Slee92f00fb2006-10-25 01:28:17 +0000693 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000694 case APP_INIT:
695
696 // Clear write buffer variables
697 writeBuffer_ = NULL;
698 writeBufferPos_ = 0;
699 writeBufferSize_ = 0;
700
Mark Slee2f6404d2006-10-10 01:37:40 +0000701 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000702 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000703 appState_ = APP_READ_FRAME_SIZE;
704
David Reiss89a12942010-10-06 17:10:52 +0000705 readBufferPos_ = 0;
706
Mark Slee2f6404d2006-10-10 01:37:40 +0000707 // Register read event
708 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000709
Mark Slee2f6404d2006-10-10 01:37:40 +0000710 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000711 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000712
713 return;
714
715 case APP_READ_FRAME_SIZE:
Dave Watson792db4e2015-01-16 11:22:01 -0800716 readWant_ += 4;
717
David Reiss89a12942010-10-06 17:10:52 +0000718 // We just read the request length
719 // Double the buffer size until it is big enough
720 if (readWant_ > readBufferSize_) {
721 if (readBufferSize_ == 0) {
722 readBufferSize_ = 1;
723 }
724 uint32_t newSize = readBufferSize_;
725 while (readWant_ > newSize) {
726 newSize *= 2;
727 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000728
David Reiss89a12942010-10-06 17:10:52 +0000729 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
730 if (newBuffer == NULL) {
731 // nothing else to be done...
732 throw std::bad_alloc();
733 }
734 readBuffer_ = newBuffer;
735 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000736 }
737
Dave Watson792db4e2015-01-16 11:22:01 -0800738 readBufferPos_ = 4;
739 *((uint32_t*)readBuffer_) = htonl(readWant_ - 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000740
741 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000742 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000743 appState_ = APP_READ_REQUEST;
744
745 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000746 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000747
748 return;
749
David Reiss01fe1532010-03-09 05:19:25 +0000750 case APP_CLOSE_CONNECTION:
751 server_->decrementActiveProcessors();
752 close();
753 return;
754
Mark Slee2f6404d2006-10-10 01:37:40 +0000755 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000756 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000757 assert(0);
758 }
759}
760
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000761void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000762 // Catch the do nothing case
763 if (eventFlags_ == eventFlags) {
764 return;
765 }
766
767 // Delete a previously existing event
768 if (eventFlags_ != 0) {
769 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000770 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000771 return;
772 }
773 }
774
775 // Update in memory structure
776 eventFlags_ = eventFlags;
777
Mark Slee402ee282007-08-23 01:43:20 +0000778 // Do not call event_set if there are no flags
779 if (!eventFlags_) {
780 return;
781 }
782
David Reiss01fe1532010-03-09 05:19:25 +0000783 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000784 * event_set:
785 *
786 * Prepares the event structure &event to be used in future calls to
787 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000788 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000789 *
790 * The events can be either EV_READ, EV_WRITE, or both, indicating
791 * that an application can read or write from the file respectively without
792 * blocking.
793 *
Mark Sleee02385b2007-06-09 01:21:16 +0000794 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000795 * the event and the type of event which will be one of: EV_TIMEOUT,
796 * EV_SIGNAL, EV_READ, EV_WRITE.
797 *
798 * The additional flag EV_PERSIST makes an event_add() persistent until
799 * event_del() has been called.
800 *
801 * Once initialized, the &event struct can be used repeatedly with
802 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000803 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000804 * when an ev structure has been added to libevent using event_add() the
805 * structure must persist until the event occurs (assuming EV_PERSIST
806 * is not set) or is removed using event_del(). You may not reuse the same
807 * ev structure for multiple monitored descriptors; each descriptor needs
808 * its own ev.
809 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100810 event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
Jake Farrellb0d95602011-12-06 01:17:26 +0000811 event_base_set(ioThread_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000812
813 // Add the event
814 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000815 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000816 }
817}
818
819/**
820 * Closes a connection
821 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000822void TNonblockingServer::TConnection::close() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000823 // Delete the registered libevent
824 if (event_del(&event_) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400825 GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR);
David Reiss105961d2010-10-06 17:10:17 +0000826 }
827
Roger Meier72957452013-06-29 00:28:50 +0200828 if (serverEventHandler_) {
David Reiss105961d2010-10-06 17:10:17 +0000829 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000830 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000831 ioThread_ = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000832
833 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000834 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000835
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000836 // close any factory produced transports
837 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000838 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000839
Roger Meier464a3a42014-07-07 21:48:28 +0200840 // release processor and handler
841 processor_.reset();
842
Mark Slee2f6404d2006-10-10 01:37:40 +0000843 // Give this object back to the server that owns it
844 server_->returnConnection(this);
845}
846
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100847void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000848 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000849 free(readBuffer_);
850 readBuffer_ = NULL;
851 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000852 }
David Reiss54bec5d2010-10-06 17:10:45 +0000853
854 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
855 // just start over
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400856 outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
David Reiss54bec5d2010-10-06 17:10:45 +0000857 largestWriteBufferSize_ = 0;
858 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000859}
860
David Reiss8ede8182010-09-02 15:26:28 +0000861TNonblockingServer::~TNonblockingServer() {
Roger Meier0c04fcc2013-03-22 19:52:08 +0100862 // Close any active connections (moves them to the idle connection stack)
863 while (activeConnections_.size()) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100864 activeConnections_.front()->close();
Roger Meier0c04fcc2013-03-22 19:52:08 +0100865 }
David Reiss8ede8182010-09-02 15:26:28 +0000866 // Clean up unused TConnection objects in connectionStack_
867 while (!connectionStack_.empty()) {
868 TConnection* connection = connectionStack_.top();
869 connectionStack_.pop();
870 delete connection;
871 }
Roger Meier0c04fcc2013-03-22 19:52:08 +0100872 // The TNonblockingIOThread objects have shared_ptrs to the Thread
873 // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
874 // objects (as runnable) so these objects will never deallocate without help.
875 while (!ioThreads_.empty()) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100876 boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
877 ioThreads_.pop_back();
878 iot->setThread(boost::shared_ptr<Thread>());
Roger Meier0c04fcc2013-03-22 19:52:08 +0100879 }
David Reiss8ede8182010-09-02 15:26:28 +0000880}
881
Mark Slee2f6404d2006-10-10 01:37:40 +0000882/**
883 * Creates a new connection either by reusing an object off the stack or
884 * by allocating a new one entirely
885 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100886TNonblockingServer::TConnection* TNonblockingServer::createConnection(THRIFT_SOCKET socket,
887 const sockaddr* addr,
888 socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000889 // Check the stack
Jake Farrellb0d95602011-12-06 01:17:26 +0000890 Guard g(connMutex_);
891
892 // pick an IO thread to handle this connection -- currently round robin
Jake Farrellb0d95602011-12-06 01:17:26 +0000893 assert(nextIOThread_ < ioThreads_.size());
894 int selectedThreadIdx = nextIOThread_;
Ben Craig64935232013-10-09 15:21:38 -0500895 nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size());
Jake Farrellb0d95602011-12-06 01:17:26 +0000896
897 TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
898
899 // Check the connection stack to see if we can re-use
900 TConnection* result = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000901 if (connectionStack_.empty()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000902 result = new TConnection(socket, ioThread, addr, addrLen);
903 ++numTConnections_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000904 } else {
Jake Farrellb0d95602011-12-06 01:17:26 +0000905 result = connectionStack_.top();
Mark Slee2f6404d2006-10-10 01:37:40 +0000906 connectionStack_.pop();
Jake Farrellb0d95602011-12-06 01:17:26 +0000907 result->init(socket, ioThread, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000908 }
Roger Meier0c04fcc2013-03-22 19:52:08 +0100909 activeConnections_.push_back(result);
Jake Farrellb0d95602011-12-06 01:17:26 +0000910 return result;
Mark Slee2f6404d2006-10-10 01:37:40 +0000911}
912
913/**
914 * Returns a connection to the stack
915 */
916void TNonblockingServer::returnConnection(TConnection* connection) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000917 Guard g(connMutex_);
918
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100919 activeConnections_.erase(std::remove(activeConnections_.begin(),
920 activeConnections_.end(),
921 connection),
922 activeConnections_.end());
Roger Meier0c04fcc2013-03-22 19:52:08 +0100923
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100924 if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000925 delete connection;
Jake Farrellb0d95602011-12-06 01:17:26 +0000926 --numTConnections_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000927 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000928 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000929 connectionStack_.push(connection);
930 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000931}
932
933/**
David Reissa79e4882008-03-05 07:51:47 +0000934 * Server socket had something happen. We accept all waiting client
935 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000936 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400937void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100938 (void)which;
David Reiss3bb5e052010-01-25 19:31:31 +0000939 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000940 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000941
Mark Slee2f6404d2006-10-10 01:37:40 +0000942 // Server socket accepted a new connection
943 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000944 sockaddr_storage addrStorage;
945 sockaddr* addrp = (sockaddr*)&addrStorage;
946 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000947
Mark Slee2f6404d2006-10-10 01:37:40 +0000948 // Going to accept a new client socket
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400949 THRIFT_SOCKET clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000950
Mark Slee2f6404d2006-10-10 01:37:40 +0000951 // Accept as many new clients as possible, even though libevent signaled only
952 // one, this helps us to avoid having to go back into the libevent engine so
953 // many times
David Reiss105961d2010-10-06 17:10:17 +0000954 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000955 // If we're overloaded, take action here
956 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000957 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000958 nConnectionsDropped_++;
959 nTotalConnectionsDropped_++;
960 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400961 ::THRIFT_CLOSESOCKET(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000962 return;
David Reiss01fe1532010-03-09 05:19:25 +0000963 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
964 if (!drainPendingTask()) {
965 // Nothing left to discard, so we drop connection instead.
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400966 ::THRIFT_CLOSESOCKET(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000967 return;
David Reiss01fe1532010-03-09 05:19:25 +0000968 }
969 }
970 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000971
Mark Slee2f6404d2006-10-10 01:37:40 +0000972 // Explicitly set this socket to NONBLOCK mode
973 int flags;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100974 if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0
975 || THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
976 GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ",
977 THRIFT_GET_SOCKET_ERROR);
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400978 ::THRIFT_CLOSESOCKET(clientSocket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000979 return;
980 }
981
982 // Create a new TConnection for this client socket.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100983 TConnection* clientConnection = createConnection(clientSocket, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000984
985 // Fail fast if we could not create a TConnection object
986 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000987 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400988 ::THRIFT_CLOSESOCKET(clientSocket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000989 return;
990 }
991
Jake Farrellb0d95602011-12-06 01:17:26 +0000992 /*
993 * Either notify the ioThread that is assigned this connection to
994 * start processing, or if it is us, we'll just ask this
995 * connection to do its initial state change here.
996 *
997 * (We need to avoid writing to our own notification pipe, to
998 * avoid possible deadlocks if the pipe is full.)
999 *
1000 * The IO thread #0 is the only one that handles these listen
1001 * events, so unless the connection has been assigned to thread #0
1002 * we know it's not on our thread.
1003 */
1004 if (clientConnection->getIOThreadNumber() == 0) {
1005 clientConnection->transition();
1006 } else {
Jens Geyerfb05cf62014-12-04 21:49:07 +01001007 if (!clientConnection->notifyIOThread()) {
1008 GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
1009 returnConnection(clientConnection);
1010 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001011 }
David Reiss3e7fca42009-09-19 01:59:13 +00001012
1013 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +00001014 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +00001015 }
Mark Slee79b16942007-11-26 19:05:29 +00001016
Mark Slee2f6404d2006-10-10 01:37:40 +00001017 // Done looping accept, now we have to make sure the error is due to
1018 // blocking. Any other error is a problem
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001019 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
1020 GlobalOutput.perror("thriftServerEventHandler: accept() ", THRIFT_GET_SOCKET_ERROR);
Mark Slee2f6404d2006-10-10 01:37:40 +00001021 }
1022}
1023
1024/**
Mark Slee79b16942007-11-26 19:05:29 +00001025 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +00001026 */
Jake Farrellb0d95602011-12-06 01:17:26 +00001027void TNonblockingServer::createAndListenOnSocket() {
Antonio Di Monaco796667b2016-01-04 23:05:19 +01001028#ifdef _WIN32
1029 TWinsockSingleton::create();
1030#endif // _WIN32
1031
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001032 THRIFT_SOCKET s;
Jake Farrellb0d95602011-12-06 01:17:26 +00001033
Mark Sleefb4b5142007-11-20 01:27:08 +00001034 struct addrinfo hints, *res, *res0;
1035 int error;
Mark Slee79b16942007-11-26 19:05:29 +00001036
Mark Sleefb4b5142007-11-20 01:27:08 +00001037 char port[sizeof("65536") + 1];
1038 memset(&hints, 0, sizeof(hints));
1039 hints.ai_family = PF_UNSPEC;
1040 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +00001041 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +00001042 sprintf(port, "%d", port_);
1043
1044 // Wildcard address
1045 error = getaddrinfo(NULL, port, &hints, &res0);
1046 if (error) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001047 throw TException("TNonblockingServer::serve() getaddrinfo "
1048 + string(THRIFT_GAI_STRERROR(error)));
Mark Sleefb4b5142007-11-20 01:27:08 +00001049 }
1050
1051 // Pick the ipv6 address first since ipv4 addresses can be mapped
1052 // into ipv6 space.
1053 for (res = res0; res; res = res->ai_next) {
1054 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
1055 break;
1056 }
1057
Mark Slee2f6404d2006-10-10 01:37:40 +00001058 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +00001059 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
1060 if (s == -1) {
1061 freeaddrinfo(res0);
1062 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +00001063 }
1064
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001065#ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +00001066 if (res->ai_family == AF_INET6) {
1067 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +00001068 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +00001069 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
1070 }
David Reiss13aea462008-06-10 22:56:04 +00001071 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001072#endif // #ifdef IPV6_V6ONLY
David Reiss13aea462008-06-10 22:56:04 +00001073
Mark Slee79b16942007-11-26 19:05:29 +00001074 int one = 1;
1075
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001076 // Set THRIFT_NO_SOCKET_CACHING to avoid 2MSL delay on server restart
1077 setsockopt(s, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +00001078
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001079 if (::bind(s, res->ai_addr, static_cast<int>(res->ai_addrlen)) == -1) {
1080 ::THRIFT_CLOSESOCKET(s);
Mark Slee79b16942007-11-26 19:05:29 +00001081 freeaddrinfo(res0);
Roger Meierd8f50f32012-04-11 21:48:56 +00001082 throw TTransportException(TTransportException::NOT_OPEN,
1083 "TNonblockingServer::serve() bind",
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001084 THRIFT_GET_SOCKET_ERROR);
Mark Slee79b16942007-11-26 19:05:29 +00001085 }
1086
1087 // Done with the addr info
1088 freeaddrinfo(res0);
1089
1090 // Set up this file descriptor for listening
1091 listenSocket(s);
1092}
1093
1094/**
1095 * Takes a socket created by listenSocket() and sets various options on it
1096 * to prepare for use in the server.
1097 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001098void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
Mark Slee2f6404d2006-10-10 01:37:40 +00001099 // Set socket to nonblocking mode
1100 int flags;
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001101 if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0
1102 || THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001103 ::THRIFT_CLOSESOCKET(s);
1104 throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +00001105 }
1106
1107 int one = 1;
1108 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +00001109
1110 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +00001111 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001112
1113 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +00001114 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +00001115
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001116// Set TCP nodelay if available, MAC OS X Hack
1117// See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
1118#ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +00001119 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001120#endif
Mark Slee2f6404d2006-10-10 01:37:40 +00001121
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001122#ifdef TCP_LOW_MIN_RTO
David Reiss1c20c872010-03-09 05:20:14 +00001123 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +00001124 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +00001125 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001126#endif
David Reiss1c20c872010-03-09 05:20:14 +00001127
Mark Slee79b16942007-11-26 19:05:29 +00001128 if (listen(s, LISTEN_BACKLOG) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001129 ::THRIFT_CLOSESOCKET(s);
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +09001130 throw TTransportException(TTransportException::NOT_OPEN, "TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +00001131 }
1132
Mark Slee79b16942007-11-26 19:05:29 +00001133 // Cool, this socket is good to go, set it as the serverSocket_
1134 serverSocket_ = s;
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +09001135
1136 if (!port_) {
Antonio Di Monaco796667b2016-01-04 23:05:19 +01001137 struct sockaddr_storage addr;
Konrad Grochowski87bb7712015-04-30 10:48:30 +02001138 socklen_t size = sizeof(addr);
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +09001139 if (!getsockname(serverSocket_, reinterpret_cast<sockaddr*>(&addr), &size)) {
Antonio Di Monaco796667b2016-01-04 23:05:19 +01001140 if (addr.ss_family == AF_INET6) {
1141 const struct sockaddr_in6* sin = reinterpret_cast<const struct sockaddr_in6*>(&addr);
1142 listenPort_ = ntohs(sin->sin6_port);
1143 } else {
1144 const struct sockaddr_in* sin = reinterpret_cast<const struct sockaddr_in*>(&addr);
1145 listenPort_ = ntohs(sin->sin_port);
1146 }
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +09001147 } else {
1148 GlobalOutput.perror("TNonblocking: failed to get listen port: ", THRIFT_GET_SOCKET_ERROR);
1149 }
1150 }
Mark Slee79b16942007-11-26 19:05:29 +00001151}
1152
David Reiss068f4162010-03-09 05:19:45 +00001153void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
1154 threadManager_ = threadManager;
Roger Meier72957452013-06-29 00:28:50 +02001155 if (threadManager) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001156 threadManager->setExpireCallback(
1157 apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose,
1158 this,
1159 apache::thrift::stdcxx::placeholders::_1));
David Reiss068f4162010-03-09 05:19:45 +00001160 threadPoolProcessing_ = true;
1161 } else {
1162 threadPoolProcessing_ = false;
1163 }
1164}
1165
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001166bool TNonblockingServer::serverOverloaded() {
David Reiss01fe1532010-03-09 05:19:25 +00001167 size_t activeConnections = numTConnections_ - connectionStack_.size();
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001168 if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) {
David Reiss01fe1532010-03-09 05:19:25 +00001169 if (!overloaded_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001170 GlobalOutput.printf("TNonblockingServer: overload condition begun.");
David Reiss01fe1532010-03-09 05:19:25 +00001171 overloaded_ = true;
1172 }
1173 } else {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001174 if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_)
1175 && (activeConnections <= overloadHysteresis_ * maxConnections_)) {
1176 GlobalOutput.printf(
1177 "TNonblockingServer: overload ended; "
1178 "%u dropped (%llu total)",
1179 nConnectionsDropped_,
1180 nTotalConnectionsDropped_);
David Reiss01fe1532010-03-09 05:19:25 +00001181 nConnectionsDropped_ = 0;
1182 overloaded_ = false;
1183 }
1184 }
1185
1186 return overloaded_;
1187}
1188
1189bool TNonblockingServer::drainPendingTask() {
1190 if (threadManager_) {
1191 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1192 if (task) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001193 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1194 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
David Reiss01fe1532010-03-09 05:19:25 +00001195 connection->forceClose();
1196 return true;
1197 }
1198 }
1199 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001200}
1201
David Reiss068f4162010-03-09 05:19:45 +00001202void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001203 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1204 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
David Reiss068f4162010-03-09 05:19:45 +00001205 connection->forceClose();
1206}
1207
Jake Farrellb0d95602011-12-06 01:17:26 +00001208void TNonblockingServer::stop() {
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +09001209 if (!port_) {
1210 listenPort_ = 0;
1211 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001212 // Breaks the event loop in all threads so that they end ASAP.
Roger Meierd0cdecf2011-12-08 19:34:01 +00001213 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001214 ioThreads_[i]->stop();
1215 }
1216}
1217
Roger Meier6f2a5032013-07-08 23:35:25 +02001218void TNonblockingServer::registerEvents(event_base* user_event_base) {
1219 userEventBase_ = user_event_base;
1220
Jake Farrellb0d95602011-12-06 01:17:26 +00001221 // init listen socket
Roger Meiere802aa42013-07-19 21:10:54 +02001222 if (serverSocket_ == THRIFT_INVALID_SOCKET)
Roger Meier6f2a5032013-07-08 23:35:25 +02001223 createAndListenOnSocket();
Mark Slee79b16942007-11-26 19:05:29 +00001224
Jake Farrellb0d95602011-12-06 01:17:26 +00001225 // set up the IO threads
1226 assert(ioThreads_.empty());
1227 if (!numIOThreads_) {
1228 numIOThreads_ = DEFAULT_IO_THREADS;
David Reiss01fe1532010-03-09 05:19:25 +00001229 }
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +09001230 // User-provided event-base doesn't works for multi-threaded servers
1231 assert(numIOThreads_ == 1 || !userEventBase_);
David Reiss01fe1532010-03-09 05:19:25 +00001232
Roger Meierd0cdecf2011-12-08 19:34:01 +00001233 for (uint32_t id = 0; id < numIOThreads_; ++id) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001234 // the first IO thread also does the listening on server socket
Roger Meier0be9ffa2013-07-19 21:10:01 +02001235 THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
Mark Slee2f6404d2006-10-10 01:37:40 +00001236
Jake Farrellb0d95602011-12-06 01:17:26 +00001237 shared_ptr<TNonblockingIOThread> thread(
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001238 new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
Jake Farrellb0d95602011-12-06 01:17:26 +00001239 ioThreads_.push_back(thread);
1240 }
1241
1242 // Notify handler of the preServe event
Roger Meier72957452013-06-29 00:28:50 +02001243 if (eventHandler_) {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001244 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001245 }
1246
Jake Farrellb0d95602011-12-06 01:17:26 +00001247 // Start all of our helper IO threads. Note that the threads run forever,
1248 // only terminating if stop() is called.
1249 assert(ioThreads_.size() == numIOThreads_);
1250 assert(ioThreads_.size() > 0);
1251
1252 GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +09001253 listenPort_,
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001254 ioThreads_.size());
Jake Farrellb0d95602011-12-06 01:17:26 +00001255
1256 // Launch all the secondary IO threads in separate threads
1257 if (ioThreads_.size() > 1) {
Roger Meier12d70532011-12-14 23:35:28 +00001258 ioThreadFactory_.reset(new PlatformThreadFactory(
Nobuaki Sukegawa28256642014-12-16 03:24:37 +09001259#if !USE_BOOST_THREAD && !USE_STD_THREAD
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001260 PlatformThreadFactory::OTHER, // scheduler
1261 PlatformThreadFactory::NORMAL, // priority
1262 1, // stack size (MB)
Roger Meier12d70532011-12-14 23:35:28 +00001263#endif
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001264 false // detached
1265 ));
Jake Farrellb0d95602011-12-06 01:17:26 +00001266
1267 assert(ioThreadFactory_.get());
1268
1269 // intentionally starting at thread 1, not 0
Roger Meierd0cdecf2011-12-08 19:34:01 +00001270 for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001271 shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1272 ioThreads_[i]->setThread(thread);
1273 thread->start();
1274 }
1275 }
1276
Roger Meier6f2a5032013-07-08 23:35:25 +02001277 // Register the events for the primary (listener) IO thread
1278 ioThreads_[0]->registerEvents();
1279}
1280
1281/**
1282 * Main workhorse function, starts up the server listening on a port and
1283 * loops over the libevent handler.
1284 */
1285void TNonblockingServer::serve() {
1286
Konrad Grochowski1f6e3802015-05-18 18:10:06 +02001287 if (ioThreads_.empty())
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +09001288 registerEvents(NULL);
Roger Meier6f2a5032013-07-08 23:35:25 +02001289
Jake Farrellb0d95602011-12-06 01:17:26 +00001290 // Run the primary (listener) IO thread loop in our main thread; this will
1291 // only return when the server is shutting down.
1292 ioThreads_[0]->run();
1293
1294 // Ensure all threads are finished before exiting serve()
Roger Meierd0cdecf2011-12-08 19:34:01 +00001295 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001296 ioThreads_[i]->join();
1297 GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1298 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001299}
1300
Jake Farrellb0d95602011-12-06 01:17:26 +00001301TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1302 int number,
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001303 THRIFT_SOCKET listenSocket,
Jake Farrellb0d95602011-12-06 01:17:26 +00001304 bool useHighPriority)
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001305 : server_(server),
1306 number_(number),
1307 listenSocket_(listenSocket),
1308 useHighPriority_(useHighPriority),
1309 eventBase_(NULL),
1310 ownEventBase_(false) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001311 notificationPipeFDs_[0] = -1;
1312 notificationPipeFDs_[1] = -1;
1313}
1314
1315TNonblockingIOThread::~TNonblockingIOThread() {
1316 // make sure our associated thread is fully finished
1317 join();
1318
Roger Meier6f2a5032013-07-08 23:35:25 +02001319 if (eventBase_ && ownEventBase_) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001320 event_base_free(eventBase_);
Roger Meier6f2a5032013-07-08 23:35:25 +02001321 ownEventBase_ = false;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001322 }
1323
gzshi41945622017-01-06 10:47:03 +08001324 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001325 if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001326 GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001327 }
Roger Meier0be9ffa2013-07-19 21:10:01 +02001328 listenSocket_ = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +00001329 }
1330
1331 for (int i = 0; i < 2; ++i) {
1332 if (notificationPipeFDs_[i] >= 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001333 if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFDs_[i])) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001334 GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001335 THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001336 }
Roger Meier0be9ffa2013-07-19 21:10:01 +02001337 notificationPipeFDs_[i] = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +00001338 }
1339 }
1340}
1341
1342void TNonblockingIOThread::createNotificationPipe() {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001343 if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
Roger Meier12d70532011-12-14 23:35:28 +00001344 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
Jake Farrellb0d95602011-12-06 01:17:26 +00001345 throw TException("can't create notification pipe");
1346 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001347 if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
1348 || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001349 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1350 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
1351 throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
Jake Farrellb0d95602011-12-06 01:17:26 +00001352 }
1353 for (int i = 0; i < 2; ++i) {
Roger Meier12d70532011-12-14 23:35:28 +00001354#if LIBEVENT_VERSION_NUMBER < 0x02000000
1355 int flags;
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001356 if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0
1357 || THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
Roger Meier12d70532011-12-14 23:35:28 +00001358#else
1359 if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
1360#endif
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001361 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1362 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001363 throw TException(
1364 "TNonblockingServer::createNotificationPipe() "
1365 "FD_CLOEXEC");
Jake Farrellb0d95602011-12-06 01:17:26 +00001366 }
1367 }
1368}
1369
1370/**
1371 * Register the core libevent events onto the proper base.
1372 */
1373void TNonblockingIOThread::registerEvents() {
Roger Meier6f2a5032013-07-08 23:35:25 +02001374 threadId_ = Thread::get_current();
1375
1376 assert(eventBase_ == 0);
1377 eventBase_ = getServer()->getUserEventBase();
1378 if (eventBase_ == NULL) {
1379 eventBase_ = event_base_new();
1380 ownEventBase_ = true;
1381 }
1382
1383 // Print some libevent stats
1384 if (number_ == 0) {
1385 GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001386 event_get_version(),
1387 event_base_get_method(eventBase_));
Roger Meier6f2a5032013-07-08 23:35:25 +02001388 }
1389
gzshi41945622017-01-06 10:47:03 +08001390 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001391 // Register the server event
1392 event_set(&serverEvent_,
1393 listenSocket_,
1394 EV_READ | EV_PERSIST,
1395 TNonblockingIOThread::listenHandler,
1396 server_);
1397 event_base_set(eventBase_, &serverEvent_);
1398
1399 // Add the event and start up the server
1400 if (-1 == event_add(&serverEvent_, 0)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001401 throw TException(
1402 "TNonblockingServer::serve(): "
1403 "event_add() failed on server listen event");
Jake Farrellb0d95602011-12-06 01:17:26 +00001404 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001405 GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001406 }
1407
1408 createNotificationPipe();
1409
1410 // Create an event to be notified when a task finishes
1411 event_set(&notificationEvent_,
1412 getNotificationRecvFD(),
1413 EV_READ | EV_PERSIST,
1414 TNonblockingIOThread::notifyHandler,
1415 this);
1416
1417 // Attach to the base
1418 event_base_set(eventBase_, &notificationEvent_);
1419
1420 // Add the event and start up the server
1421 if (-1 == event_add(&notificationEvent_, 0)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001422 throw TException(
1423 "TNonblockingServer::serve(): "
1424 "event_add() failed on task-done notification event");
Jake Farrellb0d95602011-12-06 01:17:26 +00001425 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001426 GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001427}
1428
1429bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001430 THRIFT_SOCKET fd = getNotificationSendFD();
Jake Farrellb0d95602011-12-06 01:17:26 +00001431 if (fd < 0) {
1432 return false;
1433 }
1434
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001435 fd_set wfds, efds;
tpcwangf98d59f2016-03-23 16:18:52 -07001436 long ret = -1;
1437 long kSize = sizeof(conn);
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +09001438 const char* pos = reinterpret_cast<const char*>(&conn);
abadcafe38772c92015-04-03 22:23:04 +08001439
1440 while (kSize > 0) {
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001441 FD_ZERO(&wfds);
1442 FD_ZERO(&efds);
1443 FD_SET(fd, &wfds);
1444 FD_SET(fd, &efds);
tpcwangf98d59f2016-03-23 16:18:52 -07001445 ret = select(static_cast<int>(fd + 1), NULL, &wfds, &efds, NULL);
abadcafe38772c92015-04-03 22:23:04 +08001446 if (ret < 0) {
1447 return false;
1448 } else if (ret == 0) {
1449 continue;
1450 }
1451
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001452 if (FD_ISSET(fd, &efds)) {
1453 ::THRIFT_CLOSESOCKET(fd);
abadcafe38772c92015-04-03 22:23:04 +08001454 return false;
1455 }
1456
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001457 if (FD_ISSET(fd, &wfds)) {
abadcafe38772c92015-04-03 22:23:04 +08001458 ret = send(fd, pos, kSize, 0);
1459 if (ret < 0) {
1460 if (errno == EAGAIN) {
1461 continue;
1462 }
1463
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001464 ::THRIFT_CLOSESOCKET(fd);
abadcafe38772c92015-04-03 22:23:04 +08001465 return false;
1466 }
1467
1468 kSize -= ret;
1469 pos += ret;
1470 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001471 }
1472
1473 return true;
1474}
1475
1476/* static */
Roger Meier12d70532011-12-14 23:35:28 +00001477void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001478 TNonblockingIOThread* ioThread = (TNonblockingIOThread*)v;
Jake Farrellb0d95602011-12-06 01:17:26 +00001479 assert(ioThread);
Roger Meierd0cdecf2011-12-08 19:34:01 +00001480 (void)which;
Jake Farrellb0d95602011-12-06 01:17:26 +00001481
1482 while (true) {
1483 TNonblockingServer::TConnection* connection = 0;
1484 const int kSize = sizeof(connection);
Ben Craig64935232013-10-09 15:21:38 -05001485 long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
Jake Farrellb0d95602011-12-06 01:17:26 +00001486 if (nBytes == kSize) {
1487 if (connection == NULL) {
1488 // this is the command to stop our thread, exit the handler!
1489 return;
1490 }
1491 connection->transition();
1492 } else if (nBytes > 0) {
1493 // throw away these bytes and hope that next time we get a solid read
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001494 GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
Jake Farrellb0d95602011-12-06 01:17:26 +00001495 ioThread->breakLoop(true);
1496 return;
1497 } else if (nBytes == 0) {
1498 GlobalOutput.printf("notifyHandler: Notify socket closed!");
1499 // exit the loop
1500 break;
1501 } else { // nBytes < 0
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001502 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
1503 && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
1504 GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
1505 ioThread->breakLoop(true);
1506 return;
Jake Farrellb0d95602011-12-06 01:17:26 +00001507 }
1508 // exit the loop
1509 break;
1510 }
1511 }
1512}
1513
1514void TNonblockingIOThread::breakLoop(bool error) {
1515 if (error) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001516 GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001517 // TODO: figure out something better to do here, but for now kill the
1518 // whole process.
1519 GlobalOutput.printf("TNonblockingServer: aborting process.");
1520 ::abort();
1521 }
1522
1523 // sets a flag so that the loop exits on the next event
Bryan Duxbury76c43682011-08-24 21:26:48 +00001524 event_base_loopbreak(eventBase_);
1525
Jake Farrellb0d95602011-12-06 01:17:26 +00001526 // event_base_loopbreak() only causes the loop to exit the next time
1527 // it wakes up. We need to force it to wake up, in case there are
1528 // no real events it needs to process.
Bryan Duxbury76c43682011-08-24 21:26:48 +00001529 //
Jake Farrellb0d95602011-12-06 01:17:26 +00001530 // If we're running in the same thread, we can't use the notify(0)
1531 // mechanism to stop the thread, but happily if we're running in the
1532 // same thread, this means the thread can't be blocking in the event
1533 // loop either.
Roger Meier12d70532011-12-14 23:35:28 +00001534 if (!Thread::is_current(threadId_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001535 notify(NULL);
1536 }
1537}
1538
1539void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
Roger Meier12d70532011-12-14 23:35:28 +00001540#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +00001541 // Start out with a standard, low-priority setup for the sched params.
1542 struct sched_param sp;
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001543 bzero((void*)&sp, sizeof(sp));
Jake Farrellb0d95602011-12-06 01:17:26 +00001544 int policy = SCHED_OTHER;
1545
1546 // If desired, set up high-priority sched params structure.
1547 if (value) {
1548 // FIFO scheduler, ranked above default SCHED_OTHER queue
1549 policy = SCHED_FIFO;
1550 // The priority only compares us to other SCHED_FIFO threads, so we
1551 // just pick a random priority halfway between min & max.
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001552 const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;
Jake Farrellb0d95602011-12-06 01:17:26 +00001553
1554 sp.sched_priority = priority;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001555 }
1556
Jake Farrellb0d95602011-12-06 01:17:26 +00001557 // Actually set the sched params for the current thread.
1558 if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001559 GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001560 } else {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001561 GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001562 }
Roger Meierd051ca02013-08-15 01:35:11 +02001563#else
1564 THRIFT_UNUSED_VARIABLE(value);
Roger Meier12d70532011-12-14 23:35:28 +00001565#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001566}
Bryan Duxbury76c43682011-08-24 21:26:48 +00001567
Jake Farrellb0d95602011-12-06 01:17:26 +00001568void TNonblockingIOThread::run() {
Roger Meier6f2a5032013-07-08 23:35:25 +02001569 if (eventBase_ == NULL)
1570 registerEvents();
Jake Farrellb0d95602011-12-06 01:17:26 +00001571
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001572 GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001573
1574 if (useHighPriority_) {
1575 setCurrentThreadHighPriority(true);
1576 }
1577
1578 // Run libevent engine, never returns, invokes calls to eventHandler
1579 event_base_loop(eventBase_, 0);
1580
1581 if (useHighPriority_) {
1582 setCurrentThreadHighPriority(false);
1583 }
1584
1585 // cleans up our registered events
1586 cleanupEvents();
1587
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001588 GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001589}
1590
1591void TNonblockingIOThread::cleanupEvents() {
1592 // stop the listen socket, if any
gzshi41945622017-01-06 10:47:03 +08001593 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001594 if (event_del(&serverEvent_) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001595 GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001596 }
1597 }
1598
1599 event_del(&notificationEvent_);
1600}
1601
Jake Farrellb0d95602011-12-06 01:17:26 +00001602void TNonblockingIOThread::stop() {
1603 // This should cause the thread to fall out of its event loop ASAP.
1604 breakLoop(false);
1605}
1606
1607void TNonblockingIOThread::join() {
1608 // If this was a thread created by a factory (not the thread that called
1609 // serve()), we join() it to make sure we shut down fully.
1610 if (thread_) {
1611 try {
1612 // Note that it is safe to both join() ourselves twice, as well as join
1613 // the current thread as the pthread implementation checks for deadlock.
1614 thread_->join();
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001615 } catch (...) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001616 // swallow everything
1617 }
1618 }
Bryan Duxbury76c43682011-08-24 21:26:48 +00001619}
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001620}
1621}
1622} // apache::thrift::server