blob: e60bffcafa5027745576058af06d3742401755d0 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Konrad Grochowski9be4e682013-06-22 22:03:31 +020020#include <thrift/thrift-config.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000021
Roger Meier4285ba22013-06-10 21:17:23 +020022#include <thrift/server/TNonblockingServer.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000023#include <thrift/concurrency/Exception.h>
24#include <thrift/transport/TSocket.h>
25#include <thrift/concurrency/PlatformThreadFactory.h>
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040026#include <thrift/transport/PlatformSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000027
James E. King, III82ae9572017-08-05 12:23:54 -040028#include <algorithm>
Mark Sleee02385b2007-06-09 01:21:16 +000029#include <iostream>
Lei Feiweib5ebcd12015-04-04 22:12:07 +080030
31#ifdef HAVE_SYS_SELECT_H
32#include <sys/select.h>
33#endif
Roger Meier30aae0c2011-07-08 12:23:31 +000034
35#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000036#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000037#endif
38
39#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <netinet/in.h>
41#include <netinet/tcp.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000042#endif
43
44#ifdef HAVE_ARPA_INET_H
Bryan Duxbury76c43682011-08-24 21:26:48 +000045#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000046#endif
47
48#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000049#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000050#endif
51
Roger Meier2fa9c312011-09-05 19:15:53 +000052#ifdef HAVE_FCNTL_H
Mark Slee2f6404d2006-10-10 01:37:40 +000053#include <fcntl.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000054#endif
55
Mark Slee2f6404d2006-10-10 01:37:40 +000056#include <assert.h>
Roger Meier12d70532011-12-14 23:35:28 +000057
58#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +000059#include <sched.h>
Roger Meier12d70532011-12-14 23:35:28 +000060#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000061
David Reiss9b903442009-10-21 05:51:28 +000062#ifndef AF_LOCAL
63#define AF_LOCAL AF_UNIX
64#endif
65
James E. King, III7edc8fa2017-01-20 10:11:41 -050066#ifdef HAVE_INTTYPES_H
67#include <inttypes.h>
Roger Meier12d70532011-12-14 23:35:28 +000068#endif
69
James E. King, III7edc8fa2017-01-20 10:11:41 -050070#ifdef HAVE_STDINT_H
71#include <stdint.h>
Antonio Di Monaco796667b2016-01-04 23:05:19 +010072#endif
73
Konrad Grochowski16a23a62014-11-13 15:33:38 +010074namespace apache {
75namespace thrift {
76namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000077
T Jake Lucianib5e62212009-01-31 22:36:20 +000078using namespace apache::thrift::protocol;
79using namespace apache::thrift::transport;
80using namespace apache::thrift::concurrency;
David Reiss1c20c872010-03-09 05:20:14 +000081using apache::thrift::transport::TSocket;
82using apache::thrift::transport::TTransportException;
James E. King, III82ae9572017-08-05 12:23:54 -040083using stdcxx::shared_ptr;
Mark Sleee02385b2007-06-09 01:21:16 +000084
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000085/// Three states for sockets: recv frame size, recv data, and send mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +010086enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000087
88/**
89 * Five states for the nonblocking server:
90 * 1) initialize
91 * 2) read 4 byte frame size
92 * 3) read frame of data
93 * 4) send back data (if any)
94 * 5) force immediate connection close
95 */
96enum TAppState {
97 APP_INIT,
98 APP_READ_FRAME_SIZE,
99 APP_READ_REQUEST,
100 APP_WAIT_TASK,
101 APP_SEND_RESULT,
102 APP_CLOSE_CONNECTION
103};
104
105/**
106 * Represents a connection that is handled via libevent. This connection
107 * essentially encapsulates a socket that has some associated libevent state.
108 */
109class TNonblockingServer::TConnection {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100110private:
Jake Farrellb0d95602011-12-06 01:17:26 +0000111 /// Server IO Thread handling this connection
112 TNonblockingIOThread* ioThread_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000113
114 /// Server handle
115 TNonblockingServer* server_;
116
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000117 /// TProcessor
James E. King, III82ae9572017-08-05 12:23:54 -0400118 stdcxx::shared_ptr<TProcessor> processor_;
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000119
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000120 /// Object wrapping network socket
James E. King, III82ae9572017-08-05 12:23:54 -0400121 stdcxx::shared_ptr<TSocket> tSocket_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000122
123 /// Libevent object
124 struct event event_;
125
126 /// Libevent flags
127 short eventFlags_;
128
129 /// Socket mode
130 TSocketState socketState_;
131
132 /// Application state
133 TAppState appState_;
134
135 /// How much data needed to read
136 uint32_t readWant_;
137
138 /// Where in the read buffer are we
139 uint32_t readBufferPos_;
140
141 /// Read buffer
142 uint8_t* readBuffer_;
143
144 /// Read buffer size
145 uint32_t readBufferSize_;
146
147 /// Write buffer
148 uint8_t* writeBuffer_;
149
150 /// Write buffer size
151 uint32_t writeBufferSize_;
152
153 /// How far through writing are we?
154 uint32_t writeBufferPos_;
155
156 /// Largest size of write buffer seen since buffer was constructed
157 size_t largestWriteBufferSize_;
158
159 /// Count of the number of calls for use with getResizeBufferEveryN().
160 int32_t callsForResize_;
161
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000162 /// Transport to read from
James E. King, III82ae9572017-08-05 12:23:54 -0400163 stdcxx::shared_ptr<TMemoryBuffer> inputTransport_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000164
165 /// Transport that processor writes to
James E. King, III82ae9572017-08-05 12:23:54 -0400166 stdcxx::shared_ptr<TMemoryBuffer> outputTransport_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000167
168 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
James E. King, III82ae9572017-08-05 12:23:54 -0400169 stdcxx::shared_ptr<TTransport> factoryInputTransport_;
170 stdcxx::shared_ptr<TTransport> factoryOutputTransport_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000171
172 /// Protocol decoder
James E. King, III82ae9572017-08-05 12:23:54 -0400173 stdcxx::shared_ptr<TProtocol> inputProtocol_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000174
175 /// Protocol encoder
James E. King, III82ae9572017-08-05 12:23:54 -0400176 stdcxx::shared_ptr<TProtocol> outputProtocol_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000177
178 /// Server event handler, if any
James E. King, III82ae9572017-08-05 12:23:54 -0400179 stdcxx::shared_ptr<TServerEventHandler> serverEventHandler_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000180
181 /// Thrift call context, if any
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100182 void* connectionContext_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000183
184 /// Go into read mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100185 void setRead() { setFlags(EV_READ | EV_PERSIST); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000186
187 /// Go into write mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100188 void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000189
190 /// Set socket idle
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100191 void setIdle() { setFlags(0); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000192
193 /**
194 * Set event flags for this connection.
195 *
196 * @param eventFlags flags we pass to libevent for the connection.
197 */
198 void setFlags(short eventFlags);
199
200 /**
201 * Libevent handler called (via our static wrapper) when the connection
202 * socket had something happen. Rather than use the flags libevent passed,
203 * we use the connection state to determine whether we need to read or
204 * write the socket.
205 */
206 void workSocket();
207
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100208public:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000209 class Task;
210
211 /// Constructor
James E. King, III82ae9572017-08-05 12:23:54 -0400212 TConnection(stdcxx::shared_ptr<TSocket> socket,
Divya Thaluru808d1432017-08-06 16:36:36 -0700213 TNonblockingIOThread* ioThread) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000214 readBuffer_ = NULL;
215 readBufferSize_ = 0;
216
Jake Farrellb0d95602011-12-06 01:17:26 +0000217 ioThread_ = ioThread;
218 server_ = ioThread->getServer();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000219
Jake Farrellb0d95602011-12-06 01:17:26 +0000220 // Allocate input and output transports these only need to be allocated
221 // once per TConnection (they don't need to be reallocated on init() call)
222 inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400223 outputTransport_.reset(
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100224 new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
Divya Thaluru808d1432017-08-06 16:36:36 -0700225
226 tSocket_ = socket;
227
228 init(ioThread);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000229 }
230
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100231 ~TConnection() { std::free(readBuffer_); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000232
Roger Meier0c04fcc2013-03-22 19:52:08 +0100233 /// Close this connection and free or reset its resources.
234 void close();
235
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100236 /**
237 * Check buffers against any size limits and shrink it if exceeded.
238 *
239 * @param readLimit we reduce read buffer size to this (if nonzero).
240 * @param writeLimit if nonzero and write buffer is larger, replace it.
241 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000242 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
243
244 /// Initialize
Divya Thaluru808d1432017-08-06 16:36:36 -0700245 void init(TNonblockingIOThread* ioThread);
246
247 /// set socket for connection
James E. King, III82ae9572017-08-05 12:23:54 -0400248 void setSocket(stdcxx::shared_ptr<TSocket> socket);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000249
250 /**
251 * This is called when the application transitions from one state into
252 * another. This means that it has finished writing the data that it needed
253 * to, or finished receiving the data that it needed to.
254 */
255 void transition();
256
257 /**
258 * C-callable event handler for connection events. Provides a callback
259 * that libevent can understand which invokes connection_->workSocket().
260 *
261 * @param fd the descriptor the event occurred on.
262 * @param which the flags associated with the event.
263 * @param v void* callback arg where we placed TConnection's "this".
264 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000265 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Konrad Grochowskib7af66e2014-07-08 19:22:44 +0200266 assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000267 ((TConnection*)v)->workSocket();
268 }
269
270 /**
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000271 * Notification to server that processing has ended on this request.
272 * Can be called either when processing is completed or when a waiting
273 * task has been preemptively terminated (on overload).
274 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000275 * Don't call this from the IO thread itself.
276 *
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400277 * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000278 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100279 bool notifyIOThread() { return ioThread_->notify(this); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000280
Jake Farrellb0d95602011-12-06 01:17:26 +0000281 /*
282 * Returns the number of this connection's currently assigned IO
283 * thread.
284 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100285 int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000286
287 /// Force connection shutdown for this connection.
288 void forceClose() {
289 appState_ = APP_CLOSE_CONNECTION;
Jake Farrellb0d95602011-12-06 01:17:26 +0000290 if (!notifyIOThread()) {
Changli Gao257dcef2017-04-06 00:42:01 +0800291 server_->decrementActiveProcessors();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100292 close();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000293 throw TException("TConnection::forceClose: failed write on notify pipe");
294 }
295 }
296
297 /// return the server this connection was initialized for.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100298 TNonblockingServer* getServer() const { return server_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000299
300 /// get state of connection.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100301 TAppState getState() const { return appState_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000302
303 /// return the TSocket transport wrapping this network connection
James E. King, III82ae9572017-08-05 12:23:54 -0400304 stdcxx::shared_ptr<TSocket> getTSocket() const { return tSocket_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000305
306 /// return the server event handler if any
James E. King, III82ae9572017-08-05 12:23:54 -0400307 stdcxx::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000308
309 /// return the Thrift connection context if any
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100310 void* getConnectionContext() { return connectionContext_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000311};
312
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100313class TNonblockingServer::TConnection::Task : public Runnable {
314public:
James E. King, III82ae9572017-08-05 12:23:54 -0400315 Task(stdcxx::shared_ptr<TProcessor> processor,
316 stdcxx::shared_ptr<TProtocol> input,
317 stdcxx::shared_ptr<TProtocol> output,
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100318 TConnection* connection)
319 : processor_(processor),
320 input_(input),
321 output_(output),
322 connection_(connection),
323 serverEventHandler_(connection_->getServerEventHandler()),
324 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000325
326 void run() {
327 try {
David Reiss105961d2010-10-06 17:10:17 +0000328 for (;;) {
Roger Meier72957452013-06-29 00:28:50 +0200329 if (serverEventHandler_) {
David Reiss105961d2010-10-06 17:10:17 +0000330 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
331 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100332 if (!processor_->process(input_, output_, connectionContext_)
333 || !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000334 break;
335 }
336 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000337 } catch (const TTransportException& ttx) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000338 GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
James E. King, III82ae9572017-08-05 12:23:54 -0400339 } catch (const std::bad_alloc&) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000340 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
Henrique Mendonca962b3532012-09-20 13:19:55 +0000341 exit(1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000342 } catch (const std::exception& x) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000343 GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100344 typeid(x).name(),
345 x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000346 } catch (...) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100347 GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
Mark Sleee02385b2007-06-09 01:21:16 +0000348 }
Mark Slee79b16942007-11-26 19:05:29 +0000349
David Reiss01fe1532010-03-09 05:19:25 +0000350 // Signal completion back to the libevent thread via a pipe
Jake Farrellb0d95602011-12-06 01:17:26 +0000351 if (!connection_->notifyIOThread()) {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100352 GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
Changli Gao257dcef2017-04-06 00:42:01 +0800353 connection_->server_->decrementActiveProcessors();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100354 connection_->close();
David Reiss01fe1532010-03-09 05:19:25 +0000355 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000356 }
David Reiss01fe1532010-03-09 05:19:25 +0000357 }
358
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100359 TConnection* getTConnection() { return connection_; }
Mark Sleee02385b2007-06-09 01:21:16 +0000360
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100361private:
James E. King, III82ae9572017-08-05 12:23:54 -0400362 stdcxx::shared_ptr<TProcessor> processor_;
363 stdcxx::shared_ptr<TProtocol> input_;
364 stdcxx::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000365 TConnection* connection_;
James E. King, III82ae9572017-08-05 12:23:54 -0400366 stdcxx::shared_ptr<TServerEventHandler> serverEventHandler_;
David Reiss105961d2010-10-06 17:10:17 +0000367 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000368};
Mark Slee5ea15f92007-03-05 22:55:59 +0000369
Divya Thaluru808d1432017-08-06 16:36:36 -0700370void TNonblockingServer::TConnection::init(TNonblockingIOThread* ioThread) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000371 ioThread_ = ioThread;
372 server_ = ioThread->getServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000373 appState_ = APP_INIT;
374 eventFlags_ = 0;
375
376 readBufferPos_ = 0;
377 readWant_ = 0;
378
379 writeBuffer_ = NULL;
380 writeBufferSize_ = 0;
381 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000382 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000383
David Reiss89a12942010-10-06 17:10:52 +0000384 socketState_ = SOCKET_RECV_FRAMING;
David Reiss54bec5d2010-10-06 17:10:45 +0000385 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000386
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000387 // get input/transports
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100388 factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
389 factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000390
391 // Create protocol
Dave Watson792db4e2015-01-16 11:22:01 -0800392 if (server_->getHeaderTransport()) {
393 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_,
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +0100394 factoryOutputTransport_);
Dave Watson792db4e2015-01-16 11:22:01 -0800395 outputProtocol_ = inputProtocol_;
396 } else {
397 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
398 outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
399 }
David Reiss105961d2010-10-06 17:10:17 +0000400
401 // Set up for any server event handler
402 serverEventHandler_ = server_->getEventHandler();
Roger Meier72957452013-06-29 00:28:50 +0200403 if (serverEventHandler_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100404 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
David Reiss105961d2010-10-06 17:10:17 +0000405 } else {
406 connectionContext_ = NULL;
407 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000408
409 // Get the processor
Jake Farrellb0d95602011-12-06 01:17:26 +0000410 processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000411}
412
James E. King, III82ae9572017-08-05 12:23:54 -0400413void TNonblockingServer::TConnection::setSocket(stdcxx::shared_ptr<TSocket> socket) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700414 tSocket_ = socket;
415}
416
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000417void TNonblockingServer::TConnection::workSocket() {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100418 int got = 0, left = 0, sent = 0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000419 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000420
421 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000422 case SOCKET_RECV_FRAMING:
423 union {
424 uint8_t buf[sizeof(uint32_t)];
Roger Meier3781c242011-12-11 20:07:21 +0000425 uint32_t size;
David Reiss89a12942010-10-06 17:10:52 +0000426 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000427
David Reiss89a12942010-10-06 17:10:52 +0000428 // if we've already received some bytes we kept them here
429 framing.size = readWant_;
430 // determine size of this frame
431 try {
432 // Read from the socket
433 fetch = tSocket_->read(&framing.buf[readBufferPos_],
434 uint32_t(sizeof(framing.size) - readBufferPos_));
435 if (fetch == 0) {
436 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000437 close();
438 return;
439 }
David Reiss89a12942010-10-06 17:10:52 +0000440 readBufferPos_ += fetch;
441 } catch (TTransportException& te) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700442 //In Nonblocking SSLSocket some operations need to be retried again.
443 //Current approach is parsing exception message, but a better solution needs to be investigated.
444 if(!strstr(te.what(), "retry")) {
445 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
446 close();
David Reiss89a12942010-10-06 17:10:52 +0000447
Divya Thaluru808d1432017-08-06 16:36:36 -0700448 return;
449 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000450 }
451
David Reiss89a12942010-10-06 17:10:52 +0000452 if (readBufferPos_ < sizeof(framing.size)) {
453 // more needed before frame size is known -- save what we have so far
454 readWant_ = framing.size;
455 return;
456 }
457
458 readWant_ = ntohl(framing.size);
Roger Meier3781c242011-12-11 20:07:21 +0000459 if (readWant_ > server_->getMaxFrameSize()) {
460 // Don't allow giant frame sizes. This prevents bad clients from
461 // causing us to try and allocate a giant buffer.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100462 GlobalOutput.printf(
463 "TNonblockingServer: frame size too large "
464 "(%" PRIu32 " > %" PRIu64
465 ") from client %s. "
466 "Remote side not using TFramedTransport?",
467 readWant_,
468 (uint64_t)server_->getMaxFrameSize(),
469 tSocket_->getSocketInfo().c_str());
David Reiss89a12942010-10-06 17:10:52 +0000470 close();
471 return;
472 }
473 // size known; now get the rest of the frame
474 transition();
Bugra Gedik8bcb7ac2018-01-21 09:43:49 -0800475
476 // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
477 // regular sockets, because if there is more data, libevent will fire the event handler registered for read
478 // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
479 // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
480 // that case, not having this workSocket() call here would result in a hang as we will never get to work the socket,
481 // despite having more data.
482 if (tSocket_->hasPendingDataToRead())
483 {
484 workSocket();
485 }
486
David Reiss89a12942010-10-06 17:10:52 +0000487 return;
488
489 case SOCKET_RECV:
490 // It is an error to be in this state if we already have all the data
491 assert(readBufferPos_ < readWant_);
492
David Reiss105961d2010-10-06 17:10:17 +0000493 try {
494 // Read from the socket
495 fetch = readWant_ - readBufferPos_;
496 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100497 } catch (TTransportException& te) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700498 //In Nonblocking SSLSocket some operations need to be retried again.
499 //Current approach is parsing exception message, but a better solution needs to be investigated.
500 if(!strstr(te.what(), "retry")) {
501 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
502 close();
503 }
Mark Slee79b16942007-11-26 19:05:29 +0000504
David Reiss105961d2010-10-06 17:10:17 +0000505 return;
506 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000507
Mark Slee2f6404d2006-10-10 01:37:40 +0000508 if (got > 0) {
509 // Move along in the buffer
510 readBufferPos_ += got;
511
512 // Check that we did not overdo it
513 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000514
Mark Slee2f6404d2006-10-10 01:37:40 +0000515 // We are done reading, move onto the next state
516 if (readBufferPos_ == readWant_) {
517 transition();
518 }
519 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000520 }
521
522 // Whenever we get down here it means a remote disconnect
523 close();
Mark Slee79b16942007-11-26 19:05:29 +0000524
Mark Slee2f6404d2006-10-10 01:37:40 +0000525 return;
526
527 case SOCKET_SEND:
528 // Should never have position past size
529 assert(writeBufferPos_ <= writeBufferSize_);
530
531 // If there is no data to send, then let us move on
532 if (writeBufferPos_ == writeBufferSize_) {
Buğra Gedik36d1b0d2016-09-04 17:18:15 +0900533 GlobalOutput("WARNING: Send state with no data to send");
Mark Slee2f6404d2006-10-10 01:37:40 +0000534 transition();
535 return;
536 }
537
David Reiss105961d2010-10-06 17:10:17 +0000538 try {
539 left = writeBufferSize_ - writeBufferPos_;
540 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100541 } catch (TTransportException& te) {
David Reiss105961d2010-10-06 17:10:17 +0000542 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000543 close();
544 return;
545 }
546
547 writeBufferPos_ += sent;
548
549 // Did we overdo it?
550 assert(writeBufferPos_ <= writeBufferSize_);
551
Mark Slee79b16942007-11-26 19:05:29 +0000552 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000553 if (writeBufferPos_ == writeBufferSize_) {
554 transition();
555 }
556
557 return;
558
559 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000560 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000561 assert(0);
562 }
563}
564
Dave Watson792db4e2015-01-16 11:22:01 -0800565bool TNonblockingServer::getHeaderTransport() {
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +0100566 // Currently if there is no output protocol factory,
Dave Watson792db4e2015-01-16 11:22:01 -0800567 // we assume header transport (without having to create
568 // a new transport and check)
569 return getOutputProtocolFactory() == NULL;
570}
571
Mark Slee2f6404d2006-10-10 01:37:40 +0000572/**
573 * This is called when the application transitions from one state into
574 * another. This means that it has finished writing the data that it needed
575 * to, or finished receiving the data that it needed to.
576 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000577void TNonblockingServer::TConnection::transition() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000578 // ensure this connection is active right now
579 assert(ioThread_);
580 assert(server_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000581
Mark Slee2f6404d2006-10-10 01:37:40 +0000582 // Switch upon the state that we are currently in and move to a new state
583 switch (appState_) {
584
585 case APP_READ_REQUEST:
586 // We are done reading the request, package the read buffer into transport
587 // and get back some data from the dispatch function
Dave Watson792db4e2015-01-16 11:22:01 -0800588 if (server_->getHeaderTransport()) {
589 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
590 outputTransport_->resetBuffer();
591 } else {
592 // We saved room for the framing size in case header transport needed it,
593 // but just skip it for the non-header case
594 inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
595 outputTransport_->resetBuffer();
596
597 // Prepend four bytes of blank space to the buffer so we can
598 // write the frame size there later.
599 outputTransport_->getWritePtr(4);
600 outputTransport_->wroteBytes(4);
601 }
Mark Slee79b16942007-11-26 19:05:29 +0000602
David Reiss01fe1532010-03-09 05:19:25 +0000603 server_->incrementActiveProcessors();
604
Mark Sleee02385b2007-06-09 01:21:16 +0000605 if (server_->isThreadPoolProcessing()) {
606 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000607
David Reiss01fe1532010-03-09 05:19:25 +0000608 // Create task and dispatch to the thread manager
James E. King, III82ae9572017-08-05 12:23:54 -0400609 stdcxx::shared_ptr<Runnable> task = stdcxx::shared_ptr<Runnable>(
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100610 new Task(processor_, inputProtocol_, outputProtocol_, this));
David Reiss01fe1532010-03-09 05:19:25 +0000611 // The application is now waiting on the task to finish
612 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000613
Changli Gaod4fa7062017-03-10 13:25:43 +0800614 // Set this connection idle so that libevent doesn't process more
615 // data on it while we're still waiting for the threadmanager to
616 // finish this task
617 setIdle();
618
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100619 try {
620 server_->addTask(task);
621 } catch (IllegalStateException& ise) {
622 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
623 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
Changli Gaod4fa7062017-03-10 13:25:43 +0800624 server_->decrementActiveProcessors();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100625 close();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100626 } catch (TimedOutException& to) {
627 GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
Changli Gaod4fa7062017-03-10 13:25:43 +0800628 server_->decrementActiveProcessors();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100629 close();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100630 }
Mark Slee402ee282007-08-23 01:43:20 +0000631
David Reiss01fe1532010-03-09 05:19:25 +0000632 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000633 } else {
634 try {
Roger Meier72957452013-06-29 00:28:50 +0200635 if (serverEventHandler_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100636 serverEventHandler_->processContext(connectionContext_, getTSocket());
Roger Meier72957452013-06-29 00:28:50 +0200637 }
Mark Sleee02385b2007-06-09 01:21:16 +0000638 // Invoke the processor
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100639 processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
640 } catch (const TTransportException& ttx) {
641 GlobalOutput.printf(
642 "TNonblockingServer transport error in "
643 "process(): %s",
644 ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000645 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000646 close();
647 return;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100648 } catch (const std::exception& x) {
Bryan Duxbury1e987582011-08-25 17:33:03 +0000649 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100650 typeid(x).name(),
651 x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000652 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000653 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000654 return;
655 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000656 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000657 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000658 close();
659 return;
660 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000661 }
662
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100663 // Intentionally fall through here, the call to process has written into
664 // the writeBuffer_
Mark Slee402ee282007-08-23 01:43:20 +0000665
Mark Sleee02385b2007-06-09 01:21:16 +0000666 case APP_WAIT_TASK:
667 // We have now finished processing a task and the result has been written
668 // into the outputTransport_, so we grab its contents and place them into
669 // the writeBuffer_ for actual writing by the libevent thread
670
David Reiss01fe1532010-03-09 05:19:25 +0000671 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000672 // Get the result of the operation
673 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
674
675 // If the function call generated return data, then move into the send
676 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000677 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000678 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000679
680 // Move into write state
681 writeBufferPos_ = 0;
682 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000683
David Reissaf787782008-07-03 20:29:34 +0000684 // Put the frame size into the write buffer
685 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
686 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000687
688 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000689 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000690 setWrite();
691
Mark Slee2f6404d2006-10-10 01:37:40 +0000692 return;
693 }
694
David Reissc51986f2009-03-24 20:01:25 +0000695 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000696 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000697 goto LABEL_APP_INIT;
698
Mark Slee2f6404d2006-10-10 01:37:40 +0000699 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000700 // it's now safe to perform buffer size housekeeping.
701 if (writeBufferSize_ > largestWriteBufferSize_) {
702 largestWriteBufferSize_ = writeBufferSize_;
703 }
704 if (server_->getResizeBufferEveryN() > 0
705 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
706 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
707 server_->getIdleWriteBufferLimit());
708 callsForResize_ = 0;
709 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000710
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100711 // N.B.: We also intentionally fall through here into the INIT state!
Mark Slee2f6404d2006-10-10 01:37:40 +0000712
Mark Slee92f00fb2006-10-25 01:28:17 +0000713 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000714 case APP_INIT:
715
716 // Clear write buffer variables
717 writeBuffer_ = NULL;
718 writeBufferPos_ = 0;
719 writeBufferSize_ = 0;
720
Mark Slee2f6404d2006-10-10 01:37:40 +0000721 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000722 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000723 appState_ = APP_READ_FRAME_SIZE;
724
David Reiss89a12942010-10-06 17:10:52 +0000725 readBufferPos_ = 0;
726
Mark Slee2f6404d2006-10-10 01:37:40 +0000727 // Register read event
728 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000729
Mark Slee2f6404d2006-10-10 01:37:40 +0000730 return;
731
732 case APP_READ_FRAME_SIZE:
Dave Watson792db4e2015-01-16 11:22:01 -0800733 readWant_ += 4;
734
David Reiss89a12942010-10-06 17:10:52 +0000735 // We just read the request length
736 // Double the buffer size until it is big enough
737 if (readWant_ > readBufferSize_) {
738 if (readBufferSize_ == 0) {
739 readBufferSize_ = 1;
740 }
741 uint32_t newSize = readBufferSize_;
742 while (readWant_ > newSize) {
743 newSize *= 2;
744 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000745
David Reiss89a12942010-10-06 17:10:52 +0000746 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
747 if (newBuffer == NULL) {
748 // nothing else to be done...
749 throw std::bad_alloc();
750 }
751 readBuffer_ = newBuffer;
752 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000753 }
754
Dave Watson792db4e2015-01-16 11:22:01 -0800755 readBufferPos_ = 4;
756 *((uint32_t*)readBuffer_) = htonl(readWant_ - 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000757
758 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000759 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000760 appState_ = APP_READ_REQUEST;
761
Mark Slee2f6404d2006-10-10 01:37:40 +0000762 return;
763
David Reiss01fe1532010-03-09 05:19:25 +0000764 case APP_CLOSE_CONNECTION:
765 server_->decrementActiveProcessors();
766 close();
767 return;
768
Mark Slee2f6404d2006-10-10 01:37:40 +0000769 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000770 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000771 assert(0);
772 }
773}
774
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000775void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000776 // Catch the do nothing case
777 if (eventFlags_ == eventFlags) {
778 return;
779 }
780
781 // Delete a previously existing event
Buğra Gedik36d1b0d2016-09-04 17:18:15 +0900782 if (eventFlags_ && event_del(&event_) == -1) {
783 GlobalOutput.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR);
784 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000785 }
786
787 // Update in memory structure
788 eventFlags_ = eventFlags;
789
Mark Slee402ee282007-08-23 01:43:20 +0000790 // Do not call event_set if there are no flags
791 if (!eventFlags_) {
792 return;
793 }
794
David Reiss01fe1532010-03-09 05:19:25 +0000795 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000796 * event_set:
797 *
798 * Prepares the event structure &event to be used in future calls to
799 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000800 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000801 *
802 * The events can be either EV_READ, EV_WRITE, or both, indicating
803 * that an application can read or write from the file respectively without
804 * blocking.
805 *
Mark Sleee02385b2007-06-09 01:21:16 +0000806 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000807 * the event and the type of event which will be one of: EV_TIMEOUT,
808 * EV_SIGNAL, EV_READ, EV_WRITE.
809 *
810 * The additional flag EV_PERSIST makes an event_add() persistent until
811 * event_del() has been called.
812 *
813 * Once initialized, the &event struct can be used repeatedly with
814 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000815 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000816 * when an ev structure has been added to libevent using event_add() the
817 * structure must persist until the event occurs (assuming EV_PERSIST
818 * is not set) or is removed using event_del(). You may not reuse the same
819 * ev structure for multiple monitored descriptors; each descriptor needs
820 * its own ev.
821 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100822 event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
Jake Farrellb0d95602011-12-06 01:17:26 +0000823 event_base_set(ioThread_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000824
825 // Add the event
826 if (event_add(&event_, 0) == -1) {
Buğra Gedik36d1b0d2016-09-04 17:18:15 +0900827 GlobalOutput.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR);
Mark Slee2f6404d2006-10-10 01:37:40 +0000828 }
829}
830
831/**
832 * Closes a connection
833 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000834void TNonblockingServer::TConnection::close() {
Changli Gaobf42d552017-03-20 14:29:07 +0800835 setIdle();
David Reiss105961d2010-10-06 17:10:17 +0000836
Roger Meier72957452013-06-29 00:28:50 +0200837 if (serverEventHandler_) {
David Reiss105961d2010-10-06 17:10:17 +0000838 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000839 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000840 ioThread_ = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000841
842 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000843 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000844
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000845 // close any factory produced transports
846 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000847 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000848
Roger Meier464a3a42014-07-07 21:48:28 +0200849 // release processor and handler
850 processor_.reset();
851
Mark Slee2f6404d2006-10-10 01:37:40 +0000852 // Give this object back to the server that owns it
853 server_->returnConnection(this);
854}
855
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100856void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000857 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000858 free(readBuffer_);
859 readBuffer_ = NULL;
860 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000861 }
David Reiss54bec5d2010-10-06 17:10:45 +0000862
863 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
864 // just start over
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400865 outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
David Reiss54bec5d2010-10-06 17:10:45 +0000866 largestWriteBufferSize_ = 0;
867 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000868}
869
David Reiss8ede8182010-09-02 15:26:28 +0000870TNonblockingServer::~TNonblockingServer() {
Roger Meier0c04fcc2013-03-22 19:52:08 +0100871 // Close any active connections (moves them to the idle connection stack)
872 while (activeConnections_.size()) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100873 activeConnections_.front()->close();
Roger Meier0c04fcc2013-03-22 19:52:08 +0100874 }
David Reiss8ede8182010-09-02 15:26:28 +0000875 // Clean up unused TConnection objects in connectionStack_
876 while (!connectionStack_.empty()) {
877 TConnection* connection = connectionStack_.top();
878 connectionStack_.pop();
879 delete connection;
880 }
Roger Meier0c04fcc2013-03-22 19:52:08 +0100881 // The TNonblockingIOThread objects have shared_ptrs to the Thread
882 // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
883 // objects (as runnable) so these objects will never deallocate without help.
884 while (!ioThreads_.empty()) {
James E. King, III82ae9572017-08-05 12:23:54 -0400885 stdcxx::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100886 ioThreads_.pop_back();
James E. King, III82ae9572017-08-05 12:23:54 -0400887 iot->setThread(stdcxx::shared_ptr<Thread>());
Roger Meier0c04fcc2013-03-22 19:52:08 +0100888 }
David Reiss8ede8182010-09-02 15:26:28 +0000889}
890
Mark Slee2f6404d2006-10-10 01:37:40 +0000891/**
892 * Creates a new connection either by reusing an object off the stack or
893 * by allocating a new one entirely
894 */
James E. King, III82ae9572017-08-05 12:23:54 -0400895TNonblockingServer::TConnection* TNonblockingServer::createConnection(stdcxx::shared_ptr<TSocket> socket) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000896 // Check the stack
Jake Farrellb0d95602011-12-06 01:17:26 +0000897 Guard g(connMutex_);
898
899 // pick an IO thread to handle this connection -- currently round robin
Jake Farrellb0d95602011-12-06 01:17:26 +0000900 assert(nextIOThread_ < ioThreads_.size());
901 int selectedThreadIdx = nextIOThread_;
Ben Craig64935232013-10-09 15:21:38 -0500902 nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size());
Jake Farrellb0d95602011-12-06 01:17:26 +0000903
904 TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
905
906 // Check the connection stack to see if we can re-use
907 TConnection* result = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000908 if (connectionStack_.empty()) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700909 result = new TConnection(socket, ioThread);
Jake Farrellb0d95602011-12-06 01:17:26 +0000910 ++numTConnections_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000911 } else {
Jake Farrellb0d95602011-12-06 01:17:26 +0000912 result = connectionStack_.top();
Mark Slee2f6404d2006-10-10 01:37:40 +0000913 connectionStack_.pop();
Divya Thaluru808d1432017-08-06 16:36:36 -0700914 result->setSocket(socket);
915 result->init(ioThread);
Mark Slee2f6404d2006-10-10 01:37:40 +0000916 }
Roger Meier0c04fcc2013-03-22 19:52:08 +0100917 activeConnections_.push_back(result);
Jake Farrellb0d95602011-12-06 01:17:26 +0000918 return result;
Mark Slee2f6404d2006-10-10 01:37:40 +0000919}
920
921/**
922 * Returns a connection to the stack
923 */
924void TNonblockingServer::returnConnection(TConnection* connection) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000925 Guard g(connMutex_);
926
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100927 activeConnections_.erase(std::remove(activeConnections_.begin(),
928 activeConnections_.end(),
929 connection),
930 activeConnections_.end());
Roger Meier0c04fcc2013-03-22 19:52:08 +0100931
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100932 if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000933 delete connection;
Jake Farrellb0d95602011-12-06 01:17:26 +0000934 --numTConnections_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000935 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000936 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000937 connectionStack_.push(connection);
938 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000939}
940
941/**
David Reissa79e4882008-03-05 07:51:47 +0000942 * Server socket had something happen. We accept all waiting client
943 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000944 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400945void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100946 (void)which;
David Reiss3bb5e052010-01-25 19:31:31 +0000947 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000948 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000949
Mark Slee2f6404d2006-10-10 01:37:40 +0000950 // Going to accept a new client socket
James E. King, III82ae9572017-08-05 12:23:54 -0400951 stdcxx::shared_ptr<TSocket> clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000952
Divya Thaluru808d1432017-08-06 16:36:36 -0700953 clientSocket = serverTransport_->accept();
954 if (clientSocket) {
David Reiss01fe1532010-03-09 05:19:25 +0000955 // If we're overloaded, take action here
956 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000957 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000958 nConnectionsDropped_++;
959 nTotalConnectionsDropped_++;
960 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700961 clientSocket->close();
David Reiss83b8fda2010-03-09 05:19:34 +0000962 return;
David Reiss01fe1532010-03-09 05:19:25 +0000963 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
964 if (!drainPendingTask()) {
965 // Nothing left to discard, so we drop connection instead.
Divya Thaluru808d1432017-08-06 16:36:36 -0700966 clientSocket->close();
David Reiss83b8fda2010-03-09 05:19:34 +0000967 return;
David Reiss01fe1532010-03-09 05:19:25 +0000968 }
969 }
970 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000971
Mark Slee2f6404d2006-10-10 01:37:40 +0000972 // Create a new TConnection for this client socket.
Divya Thaluru808d1432017-08-06 16:36:36 -0700973 TConnection* clientConnection = createConnection(clientSocket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000974
975 // Fail fast if we could not create a TConnection object
976 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000977 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Divya Thaluru808d1432017-08-06 16:36:36 -0700978 clientSocket->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000979 return;
980 }
981
Jake Farrellb0d95602011-12-06 01:17:26 +0000982 /*
983 * Either notify the ioThread that is assigned this connection to
984 * start processing, or if it is us, we'll just ask this
985 * connection to do its initial state change here.
986 *
987 * (We need to avoid writing to our own notification pipe, to
988 * avoid possible deadlocks if the pipe is full.)
989 *
990 * The IO thread #0 is the only one that handles these listen
991 * events, so unless the connection has been assigned to thread #0
992 * we know it's not on our thread.
993 */
994 if (clientConnection->getIOThreadNumber() == 0) {
995 clientConnection->transition();
996 } else {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100997 if (!clientConnection->notifyIOThread()) {
998 GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
Changli Gao75386db2017-03-10 13:15:37 +0800999 clientConnection->close();
Jens Geyerfb05cf62014-12-04 21:49:07 +01001000 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001001 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001002 }
1003}
1004
1005/**
Mark Slee79b16942007-11-26 19:05:29 +00001006 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +00001007 */
Jake Farrellb0d95602011-12-06 01:17:26 +00001008void TNonblockingServer::createAndListenOnSocket() {
Divya Thaluru808d1432017-08-06 16:36:36 -07001009 serverTransport_->listen();
1010 serverSocket_ = serverTransport_->getSocketFD();
Mark Slee79b16942007-11-26 19:05:29 +00001011}
1012
Mark Slee79b16942007-11-26 19:05:29 +00001013
James E. King, III82ae9572017-08-05 12:23:54 -04001014void TNonblockingServer::setThreadManager(stdcxx::shared_ptr<ThreadManager> threadManager) {
David Reiss068f4162010-03-09 05:19:45 +00001015 threadManager_ = threadManager;
Roger Meier72957452013-06-29 00:28:50 +02001016 if (threadManager) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001017 threadManager->setExpireCallback(
1018 apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose,
1019 this,
1020 apache::thrift::stdcxx::placeholders::_1));
David Reiss068f4162010-03-09 05:19:45 +00001021 threadPoolProcessing_ = true;
1022 } else {
1023 threadPoolProcessing_ = false;
1024 }
1025}
1026
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001027bool TNonblockingServer::serverOverloaded() {
David Reiss01fe1532010-03-09 05:19:25 +00001028 size_t activeConnections = numTConnections_ - connectionStack_.size();
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001029 if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) {
David Reiss01fe1532010-03-09 05:19:25 +00001030 if (!overloaded_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001031 GlobalOutput.printf("TNonblockingServer: overload condition begun.");
David Reiss01fe1532010-03-09 05:19:25 +00001032 overloaded_ = true;
1033 }
1034 } else {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001035 if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_)
1036 && (activeConnections <= overloadHysteresis_ * maxConnections_)) {
1037 GlobalOutput.printf(
1038 "TNonblockingServer: overload ended; "
1039 "%u dropped (%llu total)",
1040 nConnectionsDropped_,
1041 nTotalConnectionsDropped_);
David Reiss01fe1532010-03-09 05:19:25 +00001042 nConnectionsDropped_ = 0;
1043 overloaded_ = false;
1044 }
1045 }
1046
1047 return overloaded_;
1048}
1049
1050bool TNonblockingServer::drainPendingTask() {
1051 if (threadManager_) {
James E. King, III82ae9572017-08-05 12:23:54 -04001052 stdcxx::shared_ptr<Runnable> task = threadManager_->removeNextPending();
David Reiss01fe1532010-03-09 05:19:25 +00001053 if (task) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001054 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1055 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
David Reiss01fe1532010-03-09 05:19:25 +00001056 connection->forceClose();
1057 return true;
1058 }
1059 }
1060 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001061}
1062
James E. King, III82ae9572017-08-05 12:23:54 -04001063void TNonblockingServer::expireClose(stdcxx::shared_ptr<Runnable> task) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001064 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1065 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
David Reiss068f4162010-03-09 05:19:45 +00001066 connection->forceClose();
1067}
1068
Bugra Gedik8bcb7ac2018-01-21 09:43:49 -08001069void TNonblockingServer::stop() {
Jake Farrellb0d95602011-12-06 01:17:26 +00001070 // Breaks the event loop in all threads so that they end ASAP.
Roger Meierd0cdecf2011-12-08 19:34:01 +00001071 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001072 ioThreads_[i]->stop();
1073 }
1074}
1075
Roger Meier6f2a5032013-07-08 23:35:25 +02001076void TNonblockingServer::registerEvents(event_base* user_event_base) {
1077 userEventBase_ = user_event_base;
1078
Jake Farrellb0d95602011-12-06 01:17:26 +00001079 // init listen socket
Roger Meiere802aa42013-07-19 21:10:54 +02001080 if (serverSocket_ == THRIFT_INVALID_SOCKET)
Roger Meier6f2a5032013-07-08 23:35:25 +02001081 createAndListenOnSocket();
Mark Slee79b16942007-11-26 19:05:29 +00001082
Jake Farrellb0d95602011-12-06 01:17:26 +00001083 // set up the IO threads
1084 assert(ioThreads_.empty());
1085 if (!numIOThreads_) {
1086 numIOThreads_ = DEFAULT_IO_THREADS;
David Reiss01fe1532010-03-09 05:19:25 +00001087 }
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +09001088 // User-provided event-base doesn't works for multi-threaded servers
1089 assert(numIOThreads_ == 1 || !userEventBase_);
David Reiss01fe1532010-03-09 05:19:25 +00001090
Roger Meierd0cdecf2011-12-08 19:34:01 +00001091 for (uint32_t id = 0; id < numIOThreads_; ++id) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001092 // the first IO thread also does the listening on server socket
Roger Meier0be9ffa2013-07-19 21:10:01 +02001093 THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
Mark Slee2f6404d2006-10-10 01:37:40 +00001094
Jake Farrellb0d95602011-12-06 01:17:26 +00001095 shared_ptr<TNonblockingIOThread> thread(
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001096 new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
Jake Farrellb0d95602011-12-06 01:17:26 +00001097 ioThreads_.push_back(thread);
1098 }
1099
1100 // Notify handler of the preServe event
Roger Meier72957452013-06-29 00:28:50 +02001101 if (eventHandler_) {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001102 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001103 }
1104
Jake Farrellb0d95602011-12-06 01:17:26 +00001105 // Start all of our helper IO threads. Note that the threads run forever,
1106 // only terminating if stop() is called.
1107 assert(ioThreads_.size() == numIOThreads_);
1108 assert(ioThreads_.size() > 0);
1109
Divya Thaluru808d1432017-08-06 16:36:36 -07001110 GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.",
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001111 ioThreads_.size());
Jake Farrellb0d95602011-12-06 01:17:26 +00001112
1113 // Launch all the secondary IO threads in separate threads
1114 if (ioThreads_.size() > 1) {
Roger Meier12d70532011-12-14 23:35:28 +00001115 ioThreadFactory_.reset(new PlatformThreadFactory(
Nobuaki Sukegawa28256642014-12-16 03:24:37 +09001116#if !USE_BOOST_THREAD && !USE_STD_THREAD
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001117 PlatformThreadFactory::OTHER, // scheduler
1118 PlatformThreadFactory::NORMAL, // priority
1119 1, // stack size (MB)
Roger Meier12d70532011-12-14 23:35:28 +00001120#endif
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001121 false // detached
1122 ));
Jake Farrellb0d95602011-12-06 01:17:26 +00001123
1124 assert(ioThreadFactory_.get());
1125
1126 // intentionally starting at thread 1, not 0
Roger Meierd0cdecf2011-12-08 19:34:01 +00001127 for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001128 shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1129 ioThreads_[i]->setThread(thread);
1130 thread->start();
1131 }
1132 }
1133
Roger Meier6f2a5032013-07-08 23:35:25 +02001134 // Register the events for the primary (listener) IO thread
1135 ioThreads_[0]->registerEvents();
1136}
1137
1138/**
1139 * Main workhorse function, starts up the server listening on a port and
1140 * loops over the libevent handler.
1141 */
1142void TNonblockingServer::serve() {
1143
Konrad Grochowski1f6e3802015-05-18 18:10:06 +02001144 if (ioThreads_.empty())
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +09001145 registerEvents(NULL);
Roger Meier6f2a5032013-07-08 23:35:25 +02001146
Jake Farrellb0d95602011-12-06 01:17:26 +00001147 // Run the primary (listener) IO thread loop in our main thread; this will
1148 // only return when the server is shutting down.
1149 ioThreads_[0]->run();
1150
1151 // Ensure all threads are finished before exiting serve()
Roger Meierd0cdecf2011-12-08 19:34:01 +00001152 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001153 ioThreads_[i]->join();
1154 GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1155 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001156}
1157
Jake Farrellb0d95602011-12-06 01:17:26 +00001158TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1159 int number,
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001160 THRIFT_SOCKET listenSocket,
Jake Farrellb0d95602011-12-06 01:17:26 +00001161 bool useHighPriority)
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001162 : server_(server),
1163 number_(number),
1164 listenSocket_(listenSocket),
1165 useHighPriority_(useHighPriority),
1166 eventBase_(NULL),
1167 ownEventBase_(false) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001168 notificationPipeFDs_[0] = -1;
1169 notificationPipeFDs_[1] = -1;
1170}
1171
1172TNonblockingIOThread::~TNonblockingIOThread() {
1173 // make sure our associated thread is fully finished
1174 join();
1175
Roger Meier6f2a5032013-07-08 23:35:25 +02001176 if (eventBase_ && ownEventBase_) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001177 event_base_free(eventBase_);
Roger Meier6f2a5032013-07-08 23:35:25 +02001178 ownEventBase_ = false;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001179 }
1180
gzshi41945622017-01-06 10:47:03 +08001181 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001182 if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001183 GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001184 }
Roger Meier0be9ffa2013-07-19 21:10:01 +02001185 listenSocket_ = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +00001186 }
1187
1188 for (int i = 0; i < 2; ++i) {
1189 if (notificationPipeFDs_[i] >= 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001190 if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFDs_[i])) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001191 GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001192 THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001193 }
Roger Meier0be9ffa2013-07-19 21:10:01 +02001194 notificationPipeFDs_[i] = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +00001195 }
1196 }
1197}
1198
1199void TNonblockingIOThread::createNotificationPipe() {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001200 if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
Roger Meier12d70532011-12-14 23:35:28 +00001201 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
Jake Farrellb0d95602011-12-06 01:17:26 +00001202 throw TException("can't create notification pipe");
1203 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001204 if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
1205 || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001206 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1207 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
1208 throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
Jake Farrellb0d95602011-12-06 01:17:26 +00001209 }
1210 for (int i = 0; i < 2; ++i) {
Roger Meier12d70532011-12-14 23:35:28 +00001211#if LIBEVENT_VERSION_NUMBER < 0x02000000
1212 int flags;
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001213 if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0
1214 || THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
Roger Meier12d70532011-12-14 23:35:28 +00001215#else
1216 if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
1217#endif
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001218 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1219 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001220 throw TException(
1221 "TNonblockingServer::createNotificationPipe() "
1222 "FD_CLOEXEC");
Jake Farrellb0d95602011-12-06 01:17:26 +00001223 }
1224 }
1225}
1226
1227/**
1228 * Register the core libevent events onto the proper base.
1229 */
1230void TNonblockingIOThread::registerEvents() {
Roger Meier6f2a5032013-07-08 23:35:25 +02001231 threadId_ = Thread::get_current();
1232
1233 assert(eventBase_ == 0);
1234 eventBase_ = getServer()->getUserEventBase();
1235 if (eventBase_ == NULL) {
1236 eventBase_ = event_base_new();
1237 ownEventBase_ = true;
1238 }
1239
1240 // Print some libevent stats
1241 if (number_ == 0) {
1242 GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001243 event_get_version(),
1244 event_base_get_method(eventBase_));
Roger Meier6f2a5032013-07-08 23:35:25 +02001245 }
1246
gzshi41945622017-01-06 10:47:03 +08001247 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001248 // Register the server event
1249 event_set(&serverEvent_,
1250 listenSocket_,
1251 EV_READ | EV_PERSIST,
1252 TNonblockingIOThread::listenHandler,
1253 server_);
1254 event_base_set(eventBase_, &serverEvent_);
1255
1256 // Add the event and start up the server
1257 if (-1 == event_add(&serverEvent_, 0)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001258 throw TException(
1259 "TNonblockingServer::serve(): "
1260 "event_add() failed on server listen event");
Jake Farrellb0d95602011-12-06 01:17:26 +00001261 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001262 GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001263 }
1264
1265 createNotificationPipe();
1266
1267 // Create an event to be notified when a task finishes
1268 event_set(&notificationEvent_,
1269 getNotificationRecvFD(),
1270 EV_READ | EV_PERSIST,
1271 TNonblockingIOThread::notifyHandler,
1272 this);
1273
1274 // Attach to the base
1275 event_base_set(eventBase_, &notificationEvent_);
1276
1277 // Add the event and start up the server
1278 if (-1 == event_add(&notificationEvent_, 0)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001279 throw TException(
1280 "TNonblockingServer::serve(): "
1281 "event_add() failed on task-done notification event");
Jake Farrellb0d95602011-12-06 01:17:26 +00001282 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001283 GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001284}
1285
1286bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001287 THRIFT_SOCKET fd = getNotificationSendFD();
Jake Farrellb0d95602011-12-06 01:17:26 +00001288 if (fd < 0) {
1289 return false;
1290 }
1291
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001292 fd_set wfds, efds;
tpcwangf98d59f2016-03-23 16:18:52 -07001293 long ret = -1;
1294 long kSize = sizeof(conn);
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +09001295 const char* pos = reinterpret_cast<const char*>(&conn);
abadcafe38772c92015-04-03 22:23:04 +08001296
1297 while (kSize > 0) {
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001298 FD_ZERO(&wfds);
1299 FD_ZERO(&efds);
1300 FD_SET(fd, &wfds);
1301 FD_SET(fd, &efds);
tpcwangf98d59f2016-03-23 16:18:52 -07001302 ret = select(static_cast<int>(fd + 1), NULL, &wfds, &efds, NULL);
abadcafe38772c92015-04-03 22:23:04 +08001303 if (ret < 0) {
1304 return false;
1305 } else if (ret == 0) {
1306 continue;
1307 }
1308
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001309 if (FD_ISSET(fd, &efds)) {
1310 ::THRIFT_CLOSESOCKET(fd);
abadcafe38772c92015-04-03 22:23:04 +08001311 return false;
1312 }
1313
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001314 if (FD_ISSET(fd, &wfds)) {
abadcafe38772c92015-04-03 22:23:04 +08001315 ret = send(fd, pos, kSize, 0);
1316 if (ret < 0) {
1317 if (errno == EAGAIN) {
1318 continue;
1319 }
1320
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001321 ::THRIFT_CLOSESOCKET(fd);
abadcafe38772c92015-04-03 22:23:04 +08001322 return false;
1323 }
1324
1325 kSize -= ret;
1326 pos += ret;
1327 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001328 }
1329
1330 return true;
1331}
1332
1333/* static */
Roger Meier12d70532011-12-14 23:35:28 +00001334void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001335 TNonblockingIOThread* ioThread = (TNonblockingIOThread*)v;
Jake Farrellb0d95602011-12-06 01:17:26 +00001336 assert(ioThread);
Roger Meierd0cdecf2011-12-08 19:34:01 +00001337 (void)which;
Jake Farrellb0d95602011-12-06 01:17:26 +00001338
1339 while (true) {
1340 TNonblockingServer::TConnection* connection = 0;
1341 const int kSize = sizeof(connection);
Ben Craig64935232013-10-09 15:21:38 -05001342 long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
Jake Farrellb0d95602011-12-06 01:17:26 +00001343 if (nBytes == kSize) {
1344 if (connection == NULL) {
1345 // this is the command to stop our thread, exit the handler!
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001346 ioThread->breakLoop(false);
Jake Farrellb0d95602011-12-06 01:17:26 +00001347 return;
1348 }
1349 connection->transition();
1350 } else if (nBytes > 0) {
1351 // throw away these bytes and hope that next time we get a solid read
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001352 GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
Jake Farrellb0d95602011-12-06 01:17:26 +00001353 ioThread->breakLoop(true);
1354 return;
1355 } else if (nBytes == 0) {
1356 GlobalOutput.printf("notifyHandler: Notify socket closed!");
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001357 ioThread->breakLoop(false);
Jake Farrellb0d95602011-12-06 01:17:26 +00001358 // exit the loop
1359 break;
1360 } else { // nBytes < 0
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001361 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
1362 && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
1363 GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
1364 ioThread->breakLoop(true);
1365 return;
Jake Farrellb0d95602011-12-06 01:17:26 +00001366 }
1367 // exit the loop
1368 break;
1369 }
1370 }
1371}
1372
1373void TNonblockingIOThread::breakLoop(bool error) {
1374 if (error) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001375 GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001376 // TODO: figure out something better to do here, but for now kill the
1377 // whole process.
1378 GlobalOutput.printf("TNonblockingServer: aborting process.");
1379 ::abort();
1380 }
1381
Jake Farrellb0d95602011-12-06 01:17:26 +00001382 // If we're running in the same thread, we can't use the notify(0)
1383 // mechanism to stop the thread, but happily if we're running in the
1384 // same thread, this means the thread can't be blocking in the event
1385 // loop either.
Roger Meier12d70532011-12-14 23:35:28 +00001386 if (!Thread::is_current(threadId_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001387 notify(NULL);
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001388 } else {
1389 // cause the loop to stop ASAP - even if it has things to do in it
1390 event_base_loopbreak(eventBase_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001391 }
1392}
1393
1394void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
Roger Meier12d70532011-12-14 23:35:28 +00001395#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +00001396 // Start out with a standard, low-priority setup for the sched params.
1397 struct sched_param sp;
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001398 bzero((void*)&sp, sizeof(sp));
Jake Farrellb0d95602011-12-06 01:17:26 +00001399 int policy = SCHED_OTHER;
1400
1401 // If desired, set up high-priority sched params structure.
1402 if (value) {
1403 // FIFO scheduler, ranked above default SCHED_OTHER queue
1404 policy = SCHED_FIFO;
1405 // The priority only compares us to other SCHED_FIFO threads, so we
1406 // just pick a random priority halfway between min & max.
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001407 const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;
Jake Farrellb0d95602011-12-06 01:17:26 +00001408
1409 sp.sched_priority = priority;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001410 }
1411
Jake Farrellb0d95602011-12-06 01:17:26 +00001412 // Actually set the sched params for the current thread.
1413 if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001414 GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001415 } else {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001416 GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001417 }
Roger Meierd051ca02013-08-15 01:35:11 +02001418#else
1419 THRIFT_UNUSED_VARIABLE(value);
Roger Meier12d70532011-12-14 23:35:28 +00001420#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001421}
Bryan Duxbury76c43682011-08-24 21:26:48 +00001422
Jake Farrellb0d95602011-12-06 01:17:26 +00001423void TNonblockingIOThread::run() {
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001424 if (eventBase_ == NULL) {
Roger Meier6f2a5032013-07-08 23:35:25 +02001425 registerEvents();
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001426 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001427 if (useHighPriority_) {
1428 setCurrentThreadHighPriority(true);
1429 }
1430
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001431 if (eventBase_ != NULL)
1432 {
1433 GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
1434 // Run libevent engine, never returns, invokes calls to eventHandler
1435 event_base_loop(eventBase_, 0);
Jake Farrellb0d95602011-12-06 01:17:26 +00001436
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001437 if (useHighPriority_) {
1438 setCurrentThreadHighPriority(false);
1439 }
1440
1441 // cleans up our registered events
1442 cleanupEvents();
Jake Farrellb0d95602011-12-06 01:17:26 +00001443 }
1444
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001445 GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001446}
1447
1448void TNonblockingIOThread::cleanupEvents() {
1449 // stop the listen socket, if any
gzshi41945622017-01-06 10:47:03 +08001450 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001451 if (event_del(&serverEvent_) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001452 GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001453 }
1454 }
1455
1456 event_del(&notificationEvent_);
1457}
1458
Jake Farrellb0d95602011-12-06 01:17:26 +00001459void TNonblockingIOThread::stop() {
1460 // This should cause the thread to fall out of its event loop ASAP.
1461 breakLoop(false);
1462}
1463
1464void TNonblockingIOThread::join() {
1465 // If this was a thread created by a factory (not the thread that called
1466 // serve()), we join() it to make sure we shut down fully.
1467 if (thread_) {
1468 try {
1469 // Note that it is safe to both join() ourselves twice, as well as join
1470 // the current thread as the pthread implementation checks for deadlock.
1471 thread_->join();
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001472 } catch (...) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001473 // swallow everything
1474 }
1475 }
Bryan Duxbury76c43682011-08-24 21:26:48 +00001476}
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001477}
1478}
1479} // apache::thrift::server