blob: f89b5f793597f4f37d29b739a5f84ac1496b6eca [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Konrad Grochowski9be4e682013-06-22 22:03:31 +020020#include <thrift/thrift-config.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000021
Roger Meier4285ba22013-06-10 21:17:23 +020022#include <thrift/server/TNonblockingServer.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000023#include <thrift/concurrency/Exception.h>
24#include <thrift/transport/TSocket.h>
25#include <thrift/concurrency/PlatformThreadFactory.h>
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040026#include <thrift/transport/PlatformSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000027
James E. King, III82ae9572017-08-05 12:23:54 -040028#include <algorithm>
Mark Sleee02385b2007-06-09 01:21:16 +000029#include <iostream>
Lei Feiweib5ebcd12015-04-04 22:12:07 +080030
31#ifdef HAVE_SYS_SELECT_H
32#include <sys/select.h>
33#endif
Roger Meier30aae0c2011-07-08 12:23:31 +000034
35#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000036#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000037#endif
38
39#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <netinet/in.h>
41#include <netinet/tcp.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000042#endif
43
44#ifdef HAVE_ARPA_INET_H
Bryan Duxbury76c43682011-08-24 21:26:48 +000045#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000046#endif
47
48#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000049#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000050#endif
51
Roger Meier2fa9c312011-09-05 19:15:53 +000052#ifdef HAVE_FCNTL_H
Mark Slee2f6404d2006-10-10 01:37:40 +000053#include <fcntl.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000054#endif
55
Mark Slee2f6404d2006-10-10 01:37:40 +000056#include <assert.h>
Roger Meier12d70532011-12-14 23:35:28 +000057
58#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +000059#include <sched.h>
Roger Meier12d70532011-12-14 23:35:28 +000060#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000061
David Reiss9b903442009-10-21 05:51:28 +000062#ifndef AF_LOCAL
63#define AF_LOCAL AF_UNIX
64#endif
65
James E. King, III7edc8fa2017-01-20 10:11:41 -050066#ifdef HAVE_INTTYPES_H
67#include <inttypes.h>
Roger Meier12d70532011-12-14 23:35:28 +000068#endif
69
James E. King, III7edc8fa2017-01-20 10:11:41 -050070#ifdef HAVE_STDINT_H
71#include <stdint.h>
Antonio Di Monaco796667b2016-01-04 23:05:19 +010072#endif
73
Konrad Grochowski16a23a62014-11-13 15:33:38 +010074namespace apache {
75namespace thrift {
76namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000077
T Jake Lucianib5e62212009-01-31 22:36:20 +000078using namespace apache::thrift::protocol;
79using namespace apache::thrift::transport;
80using namespace apache::thrift::concurrency;
David Reiss1c20c872010-03-09 05:20:14 +000081using apache::thrift::transport::TSocket;
82using apache::thrift::transport::TTransportException;
James E. King, III82ae9572017-08-05 12:23:54 -040083using stdcxx::shared_ptr;
Mark Sleee02385b2007-06-09 01:21:16 +000084
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000085/// Three states for sockets: recv frame size, recv data, and send mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +010086enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000087
88/**
89 * Five states for the nonblocking server:
90 * 1) initialize
91 * 2) read 4 byte frame size
92 * 3) read frame of data
93 * 4) send back data (if any)
94 * 5) force immediate connection close
95 */
96enum TAppState {
97 APP_INIT,
98 APP_READ_FRAME_SIZE,
99 APP_READ_REQUEST,
100 APP_WAIT_TASK,
101 APP_SEND_RESULT,
102 APP_CLOSE_CONNECTION
103};
104
105/**
106 * Represents a connection that is handled via libevent. This connection
107 * essentially encapsulates a socket that has some associated libevent state.
108 */
109class TNonblockingServer::TConnection {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100110private:
Jake Farrellb0d95602011-12-06 01:17:26 +0000111 /// Server IO Thread handling this connection
112 TNonblockingIOThread* ioThread_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000113
114 /// Server handle
115 TNonblockingServer* server_;
116
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000117 /// TProcessor
James E. King, III82ae9572017-08-05 12:23:54 -0400118 stdcxx::shared_ptr<TProcessor> processor_;
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000119
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000120 /// Object wrapping network socket
James E. King, III82ae9572017-08-05 12:23:54 -0400121 stdcxx::shared_ptr<TSocket> tSocket_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000122
123 /// Libevent object
124 struct event event_;
125
126 /// Libevent flags
127 short eventFlags_;
128
129 /// Socket mode
130 TSocketState socketState_;
131
132 /// Application state
133 TAppState appState_;
134
135 /// How much data needed to read
136 uint32_t readWant_;
137
138 /// Where in the read buffer are we
139 uint32_t readBufferPos_;
140
141 /// Read buffer
142 uint8_t* readBuffer_;
143
144 /// Read buffer size
145 uint32_t readBufferSize_;
146
147 /// Write buffer
148 uint8_t* writeBuffer_;
149
150 /// Write buffer size
151 uint32_t writeBufferSize_;
152
153 /// How far through writing are we?
154 uint32_t writeBufferPos_;
155
156 /// Largest size of write buffer seen since buffer was constructed
157 size_t largestWriteBufferSize_;
158
159 /// Count of the number of calls for use with getResizeBufferEveryN().
160 int32_t callsForResize_;
161
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000162 /// Transport to read from
James E. King, III82ae9572017-08-05 12:23:54 -0400163 stdcxx::shared_ptr<TMemoryBuffer> inputTransport_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000164
165 /// Transport that processor writes to
James E. King, III82ae9572017-08-05 12:23:54 -0400166 stdcxx::shared_ptr<TMemoryBuffer> outputTransport_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000167
168 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
James E. King, III82ae9572017-08-05 12:23:54 -0400169 stdcxx::shared_ptr<TTransport> factoryInputTransport_;
170 stdcxx::shared_ptr<TTransport> factoryOutputTransport_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000171
172 /// Protocol decoder
James E. King, III82ae9572017-08-05 12:23:54 -0400173 stdcxx::shared_ptr<TProtocol> inputProtocol_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000174
175 /// Protocol encoder
James E. King, III82ae9572017-08-05 12:23:54 -0400176 stdcxx::shared_ptr<TProtocol> outputProtocol_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000177
178 /// Server event handler, if any
James E. King, III82ae9572017-08-05 12:23:54 -0400179 stdcxx::shared_ptr<TServerEventHandler> serverEventHandler_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000180
181 /// Thrift call context, if any
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100182 void* connectionContext_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000183
184 /// Go into read mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100185 void setRead() { setFlags(EV_READ | EV_PERSIST); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000186
187 /// Go into write mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100188 void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000189
190 /// Set socket idle
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100191 void setIdle() { setFlags(0); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000192
193 /**
194 * Set event flags for this connection.
195 *
196 * @param eventFlags flags we pass to libevent for the connection.
197 */
198 void setFlags(short eventFlags);
199
200 /**
201 * Libevent handler called (via our static wrapper) when the connection
202 * socket had something happen. Rather than use the flags libevent passed,
203 * we use the connection state to determine whether we need to read or
204 * write the socket.
205 */
206 void workSocket();
207
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100208public:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000209 class Task;
210
211 /// Constructor
James E. King, III82ae9572017-08-05 12:23:54 -0400212 TConnection(stdcxx::shared_ptr<TSocket> socket,
Divya Thaluru808d1432017-08-06 16:36:36 -0700213 TNonblockingIOThread* ioThread) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000214 readBuffer_ = NULL;
215 readBufferSize_ = 0;
216
Jake Farrellb0d95602011-12-06 01:17:26 +0000217 ioThread_ = ioThread;
218 server_ = ioThread->getServer();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000219
Jake Farrellb0d95602011-12-06 01:17:26 +0000220 // Allocate input and output transports these only need to be allocated
221 // once per TConnection (they don't need to be reallocated on init() call)
222 inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400223 outputTransport_.reset(
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100224 new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
Divya Thaluru808d1432017-08-06 16:36:36 -0700225
226 tSocket_ = socket;
227
228 init(ioThread);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000229 }
230
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100231 ~TConnection() { std::free(readBuffer_); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000232
Roger Meier0c04fcc2013-03-22 19:52:08 +0100233 /// Close this connection and free or reset its resources.
234 void close();
235
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100236 /**
237 * Check buffers against any size limits and shrink it if exceeded.
238 *
239 * @param readLimit we reduce read buffer size to this (if nonzero).
240 * @param writeLimit if nonzero and write buffer is larger, replace it.
241 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000242 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
243
244 /// Initialize
Divya Thaluru808d1432017-08-06 16:36:36 -0700245 void init(TNonblockingIOThread* ioThread);
246
247 /// set socket for connection
James E. King, III82ae9572017-08-05 12:23:54 -0400248 void setSocket(stdcxx::shared_ptr<TSocket> socket);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000249
250 /**
251 * This is called when the application transitions from one state into
252 * another. This means that it has finished writing the data that it needed
253 * to, or finished receiving the data that it needed to.
254 */
255 void transition();
256
257 /**
258 * C-callable event handler for connection events. Provides a callback
259 * that libevent can understand which invokes connection_->workSocket().
260 *
261 * @param fd the descriptor the event occurred on.
262 * @param which the flags associated with the event.
263 * @param v void* callback arg where we placed TConnection's "this".
264 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000265 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Konrad Grochowskib7af66e2014-07-08 19:22:44 +0200266 assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000267 ((TConnection*)v)->workSocket();
268 }
269
270 /**
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000271 * Notification to server that processing has ended on this request.
272 * Can be called either when processing is completed or when a waiting
273 * task has been preemptively terminated (on overload).
274 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000275 * Don't call this from the IO thread itself.
276 *
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400277 * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000278 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100279 bool notifyIOThread() { return ioThread_->notify(this); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000280
Jake Farrellb0d95602011-12-06 01:17:26 +0000281 /*
282 * Returns the number of this connection's currently assigned IO
283 * thread.
284 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100285 int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000286
287 /// Force connection shutdown for this connection.
288 void forceClose() {
289 appState_ = APP_CLOSE_CONNECTION;
Jake Farrellb0d95602011-12-06 01:17:26 +0000290 if (!notifyIOThread()) {
Changli Gao257dcef2017-04-06 00:42:01 +0800291 server_->decrementActiveProcessors();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100292 close();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000293 throw TException("TConnection::forceClose: failed write on notify pipe");
294 }
295 }
296
297 /// return the server this connection was initialized for.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100298 TNonblockingServer* getServer() const { return server_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000299
300 /// get state of connection.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100301 TAppState getState() const { return appState_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000302
303 /// return the TSocket transport wrapping this network connection
James E. King, III82ae9572017-08-05 12:23:54 -0400304 stdcxx::shared_ptr<TSocket> getTSocket() const { return tSocket_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000305
306 /// return the server event handler if any
James E. King, III82ae9572017-08-05 12:23:54 -0400307 stdcxx::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000308
309 /// return the Thrift connection context if any
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100310 void* getConnectionContext() { return connectionContext_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000311};
312
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100313class TNonblockingServer::TConnection::Task : public Runnable {
314public:
James E. King, III82ae9572017-08-05 12:23:54 -0400315 Task(stdcxx::shared_ptr<TProcessor> processor,
316 stdcxx::shared_ptr<TProtocol> input,
317 stdcxx::shared_ptr<TProtocol> output,
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100318 TConnection* connection)
319 : processor_(processor),
320 input_(input),
321 output_(output),
322 connection_(connection),
323 serverEventHandler_(connection_->getServerEventHandler()),
324 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000325
326 void run() {
327 try {
David Reiss105961d2010-10-06 17:10:17 +0000328 for (;;) {
Roger Meier72957452013-06-29 00:28:50 +0200329 if (serverEventHandler_) {
David Reiss105961d2010-10-06 17:10:17 +0000330 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
331 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100332 if (!processor_->process(input_, output_, connectionContext_)
333 || !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000334 break;
335 }
336 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000337 } catch (const TTransportException& ttx) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000338 GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
James E. King, III82ae9572017-08-05 12:23:54 -0400339 } catch (const std::bad_alloc&) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000340 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
Henrique Mendonca962b3532012-09-20 13:19:55 +0000341 exit(1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000342 } catch (const std::exception& x) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000343 GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100344 typeid(x).name(),
345 x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000346 } catch (...) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100347 GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
Mark Sleee02385b2007-06-09 01:21:16 +0000348 }
Mark Slee79b16942007-11-26 19:05:29 +0000349
David Reiss01fe1532010-03-09 05:19:25 +0000350 // Signal completion back to the libevent thread via a pipe
Jake Farrellb0d95602011-12-06 01:17:26 +0000351 if (!connection_->notifyIOThread()) {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100352 GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
Changli Gao257dcef2017-04-06 00:42:01 +0800353 connection_->server_->decrementActiveProcessors();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100354 connection_->close();
David Reiss01fe1532010-03-09 05:19:25 +0000355 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000356 }
David Reiss01fe1532010-03-09 05:19:25 +0000357 }
358
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100359 TConnection* getTConnection() { return connection_; }
Mark Sleee02385b2007-06-09 01:21:16 +0000360
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100361private:
James E. King, III82ae9572017-08-05 12:23:54 -0400362 stdcxx::shared_ptr<TProcessor> processor_;
363 stdcxx::shared_ptr<TProtocol> input_;
364 stdcxx::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000365 TConnection* connection_;
James E. King, III82ae9572017-08-05 12:23:54 -0400366 stdcxx::shared_ptr<TServerEventHandler> serverEventHandler_;
David Reiss105961d2010-10-06 17:10:17 +0000367 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000368};
Mark Slee5ea15f92007-03-05 22:55:59 +0000369
Divya Thaluru808d1432017-08-06 16:36:36 -0700370void TNonblockingServer::TConnection::init(TNonblockingIOThread* ioThread) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000371 ioThread_ = ioThread;
372 server_ = ioThread->getServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000373 appState_ = APP_INIT;
374 eventFlags_ = 0;
375
376 readBufferPos_ = 0;
377 readWant_ = 0;
378
379 writeBuffer_ = NULL;
380 writeBufferSize_ = 0;
381 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000382 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000383
David Reiss89a12942010-10-06 17:10:52 +0000384 socketState_ = SOCKET_RECV_FRAMING;
David Reiss54bec5d2010-10-06 17:10:45 +0000385 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000386
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000387 // get input/transports
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100388 factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
389 factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000390
391 // Create protocol
Dave Watson792db4e2015-01-16 11:22:01 -0800392 if (server_->getHeaderTransport()) {
393 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_,
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +0100394 factoryOutputTransport_);
Dave Watson792db4e2015-01-16 11:22:01 -0800395 outputProtocol_ = inputProtocol_;
396 } else {
397 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
398 outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
399 }
David Reiss105961d2010-10-06 17:10:17 +0000400
401 // Set up for any server event handler
402 serverEventHandler_ = server_->getEventHandler();
Roger Meier72957452013-06-29 00:28:50 +0200403 if (serverEventHandler_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100404 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
David Reiss105961d2010-10-06 17:10:17 +0000405 } else {
406 connectionContext_ = NULL;
407 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000408
409 // Get the processor
Jake Farrellb0d95602011-12-06 01:17:26 +0000410 processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000411}
412
James E. King, III82ae9572017-08-05 12:23:54 -0400413void TNonblockingServer::TConnection::setSocket(stdcxx::shared_ptr<TSocket> socket) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700414 tSocket_ = socket;
415}
416
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000417void TNonblockingServer::TConnection::workSocket() {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100418 int got = 0, left = 0, sent = 0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000419 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000420
421 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000422 case SOCKET_RECV_FRAMING:
423 union {
424 uint8_t buf[sizeof(uint32_t)];
Roger Meier3781c242011-12-11 20:07:21 +0000425 uint32_t size;
David Reiss89a12942010-10-06 17:10:52 +0000426 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000427
David Reiss89a12942010-10-06 17:10:52 +0000428 // if we've already received some bytes we kept them here
429 framing.size = readWant_;
430 // determine size of this frame
431 try {
432 // Read from the socket
433 fetch = tSocket_->read(&framing.buf[readBufferPos_],
434 uint32_t(sizeof(framing.size) - readBufferPos_));
435 if (fetch == 0) {
436 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000437 close();
438 return;
439 }
David Reiss89a12942010-10-06 17:10:52 +0000440 readBufferPos_ += fetch;
441 } catch (TTransportException& te) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700442 //In Nonblocking SSLSocket some operations need to be retried again.
443 //Current approach is parsing exception message, but a better solution needs to be investigated.
444 if(!strstr(te.what(), "retry")) {
445 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
446 close();
David Reiss89a12942010-10-06 17:10:52 +0000447
Divya Thaluru808d1432017-08-06 16:36:36 -0700448 return;
449 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000450 }
451
David Reiss89a12942010-10-06 17:10:52 +0000452 if (readBufferPos_ < sizeof(framing.size)) {
453 // more needed before frame size is known -- save what we have so far
454 readWant_ = framing.size;
455 return;
456 }
457
458 readWant_ = ntohl(framing.size);
Roger Meier3781c242011-12-11 20:07:21 +0000459 if (readWant_ > server_->getMaxFrameSize()) {
460 // Don't allow giant frame sizes. This prevents bad clients from
461 // causing us to try and allocate a giant buffer.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100462 GlobalOutput.printf(
463 "TNonblockingServer: frame size too large "
464 "(%" PRIu32 " > %" PRIu64
465 ") from client %s. "
466 "Remote side not using TFramedTransport?",
467 readWant_,
468 (uint64_t)server_->getMaxFrameSize(),
469 tSocket_->getSocketInfo().c_str());
David Reiss89a12942010-10-06 17:10:52 +0000470 close();
471 return;
472 }
473 // size known; now get the rest of the frame
474 transition();
Bugra Gedik8bcb7ac2018-01-21 09:43:49 -0800475
476 // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
477 // regular sockets, because if there is more data, libevent will fire the event handler registered for read
478 // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
479 // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
480 // that case, not having this workSocket() call here would result in a hang as we will never get to work the socket,
481 // despite having more data.
482 if (tSocket_->hasPendingDataToRead())
483 {
484 workSocket();
485 }
486
David Reiss89a12942010-10-06 17:10:52 +0000487 return;
488
489 case SOCKET_RECV:
490 // It is an error to be in this state if we already have all the data
491 assert(readBufferPos_ < readWant_);
492
David Reiss105961d2010-10-06 17:10:17 +0000493 try {
494 // Read from the socket
495 fetch = readWant_ - readBufferPos_;
496 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100497 } catch (TTransportException& te) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700498 //In Nonblocking SSLSocket some operations need to be retried again.
499 //Current approach is parsing exception message, but a better solution needs to be investigated.
500 if(!strstr(te.what(), "retry")) {
501 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
502 close();
503 }
Mark Slee79b16942007-11-26 19:05:29 +0000504
David Reiss105961d2010-10-06 17:10:17 +0000505 return;
506 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000507
Mark Slee2f6404d2006-10-10 01:37:40 +0000508 if (got > 0) {
509 // Move along in the buffer
510 readBufferPos_ += got;
511
512 // Check that we did not overdo it
513 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000514
Mark Slee2f6404d2006-10-10 01:37:40 +0000515 // We are done reading, move onto the next state
516 if (readBufferPos_ == readWant_) {
517 transition();
518 }
519 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000520 }
521
522 // Whenever we get down here it means a remote disconnect
523 close();
Mark Slee79b16942007-11-26 19:05:29 +0000524
Mark Slee2f6404d2006-10-10 01:37:40 +0000525 return;
526
527 case SOCKET_SEND:
528 // Should never have position past size
529 assert(writeBufferPos_ <= writeBufferSize_);
530
531 // If there is no data to send, then let us move on
532 if (writeBufferPos_ == writeBufferSize_) {
Buğra Gedik36d1b0d2016-09-04 17:18:15 +0900533 GlobalOutput("WARNING: Send state with no data to send");
Mark Slee2f6404d2006-10-10 01:37:40 +0000534 transition();
535 return;
536 }
537
David Reiss105961d2010-10-06 17:10:17 +0000538 try {
539 left = writeBufferSize_ - writeBufferPos_;
540 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100541 } catch (TTransportException& te) {
David Reiss105961d2010-10-06 17:10:17 +0000542 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000543 close();
544 return;
545 }
546
547 writeBufferPos_ += sent;
548
549 // Did we overdo it?
550 assert(writeBufferPos_ <= writeBufferSize_);
551
Mark Slee79b16942007-11-26 19:05:29 +0000552 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000553 if (writeBufferPos_ == writeBufferSize_) {
554 transition();
555 }
556
557 return;
558
559 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000560 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000561 assert(0);
562 }
563}
564
Dave Watson792db4e2015-01-16 11:22:01 -0800565bool TNonblockingServer::getHeaderTransport() {
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +0100566 // Currently if there is no output protocol factory,
Dave Watson792db4e2015-01-16 11:22:01 -0800567 // we assume header transport (without having to create
568 // a new transport and check)
569 return getOutputProtocolFactory() == NULL;
570}
571
Mark Slee2f6404d2006-10-10 01:37:40 +0000572/**
573 * This is called when the application transitions from one state into
574 * another. This means that it has finished writing the data that it needed
575 * to, or finished receiving the data that it needed to.
576 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000577void TNonblockingServer::TConnection::transition() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000578 // ensure this connection is active right now
579 assert(ioThread_);
580 assert(server_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000581
Mark Slee2f6404d2006-10-10 01:37:40 +0000582 // Switch upon the state that we are currently in and move to a new state
583 switch (appState_) {
584
585 case APP_READ_REQUEST:
586 // We are done reading the request, package the read buffer into transport
587 // and get back some data from the dispatch function
Dave Watson792db4e2015-01-16 11:22:01 -0800588 if (server_->getHeaderTransport()) {
589 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
590 outputTransport_->resetBuffer();
591 } else {
592 // We saved room for the framing size in case header transport needed it,
593 // but just skip it for the non-header case
594 inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
595 outputTransport_->resetBuffer();
596
597 // Prepend four bytes of blank space to the buffer so we can
598 // write the frame size there later.
599 outputTransport_->getWritePtr(4);
600 outputTransport_->wroteBytes(4);
601 }
Mark Slee79b16942007-11-26 19:05:29 +0000602
David Reiss01fe1532010-03-09 05:19:25 +0000603 server_->incrementActiveProcessors();
604
Mark Sleee02385b2007-06-09 01:21:16 +0000605 if (server_->isThreadPoolProcessing()) {
606 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000607
David Reiss01fe1532010-03-09 05:19:25 +0000608 // Create task and dispatch to the thread manager
James E. King, III82ae9572017-08-05 12:23:54 -0400609 stdcxx::shared_ptr<Runnable> task = stdcxx::shared_ptr<Runnable>(
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100610 new Task(processor_, inputProtocol_, outputProtocol_, this));
David Reiss01fe1532010-03-09 05:19:25 +0000611 // The application is now waiting on the task to finish
612 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000613
Changli Gaod4fa7062017-03-10 13:25:43 +0800614 // Set this connection idle so that libevent doesn't process more
615 // data on it while we're still waiting for the threadmanager to
616 // finish this task
617 setIdle();
618
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100619 try {
620 server_->addTask(task);
621 } catch (IllegalStateException& ise) {
622 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
623 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
Changli Gaod4fa7062017-03-10 13:25:43 +0800624 server_->decrementActiveProcessors();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100625 close();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100626 } catch (TimedOutException& to) {
627 GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
Changli Gaod4fa7062017-03-10 13:25:43 +0800628 server_->decrementActiveProcessors();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100629 close();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100630 }
Mark Slee402ee282007-08-23 01:43:20 +0000631
David Reiss01fe1532010-03-09 05:19:25 +0000632 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000633 } else {
634 try {
Roger Meier72957452013-06-29 00:28:50 +0200635 if (serverEventHandler_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100636 serverEventHandler_->processContext(connectionContext_, getTSocket());
Roger Meier72957452013-06-29 00:28:50 +0200637 }
Mark Sleee02385b2007-06-09 01:21:16 +0000638 // Invoke the processor
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100639 processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
640 } catch (const TTransportException& ttx) {
641 GlobalOutput.printf(
642 "TNonblockingServer transport error in "
643 "process(): %s",
644 ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000645 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000646 close();
647 return;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100648 } catch (const std::exception& x) {
Bryan Duxbury1e987582011-08-25 17:33:03 +0000649 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100650 typeid(x).name(),
651 x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000652 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000653 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000654 return;
655 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000656 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000657 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000658 close();
659 return;
660 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000661 }
James E. King III9bea32f2018-03-16 16:07:42 -0400662 // fallthrough
Mark Slee2f6404d2006-10-10 01:37:40 +0000663
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100664 // Intentionally fall through here, the call to process has written into
665 // the writeBuffer_
Mark Slee402ee282007-08-23 01:43:20 +0000666
Mark Sleee02385b2007-06-09 01:21:16 +0000667 case APP_WAIT_TASK:
668 // We have now finished processing a task and the result has been written
669 // into the outputTransport_, so we grab its contents and place them into
670 // the writeBuffer_ for actual writing by the libevent thread
671
David Reiss01fe1532010-03-09 05:19:25 +0000672 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000673 // Get the result of the operation
674 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
675
676 // If the function call generated return data, then move into the send
677 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000678 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000679 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000680
681 // Move into write state
682 writeBufferPos_ = 0;
683 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000684
David Reissaf787782008-07-03 20:29:34 +0000685 // Put the frame size into the write buffer
686 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
687 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000688
689 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000690 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000691 setWrite();
692
Mark Slee2f6404d2006-10-10 01:37:40 +0000693 return;
694 }
695
David Reissc51986f2009-03-24 20:01:25 +0000696 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000697 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000698 goto LABEL_APP_INIT;
699
Mark Slee2f6404d2006-10-10 01:37:40 +0000700 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000701 // it's now safe to perform buffer size housekeeping.
702 if (writeBufferSize_ > largestWriteBufferSize_) {
703 largestWriteBufferSize_ = writeBufferSize_;
704 }
705 if (server_->getResizeBufferEveryN() > 0
706 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
707 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
708 server_->getIdleWriteBufferLimit());
709 callsForResize_ = 0;
710 }
James E. King III9bea32f2018-03-16 16:07:42 -0400711 // fallthrough
Mark Slee2f6404d2006-10-10 01:37:40 +0000712
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100713 // N.B.: We also intentionally fall through here into the INIT state!
Mark Slee2f6404d2006-10-10 01:37:40 +0000714
Mark Slee92f00fb2006-10-25 01:28:17 +0000715 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000716 case APP_INIT:
717
718 // Clear write buffer variables
719 writeBuffer_ = NULL;
720 writeBufferPos_ = 0;
721 writeBufferSize_ = 0;
722
Mark Slee2f6404d2006-10-10 01:37:40 +0000723 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000724 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000725 appState_ = APP_READ_FRAME_SIZE;
726
David Reiss89a12942010-10-06 17:10:52 +0000727 readBufferPos_ = 0;
728
Mark Slee2f6404d2006-10-10 01:37:40 +0000729 // Register read event
730 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000731
Mark Slee2f6404d2006-10-10 01:37:40 +0000732 return;
733
734 case APP_READ_FRAME_SIZE:
Dave Watson792db4e2015-01-16 11:22:01 -0800735 readWant_ += 4;
736
David Reiss89a12942010-10-06 17:10:52 +0000737 // We just read the request length
738 // Double the buffer size until it is big enough
739 if (readWant_ > readBufferSize_) {
740 if (readBufferSize_ == 0) {
741 readBufferSize_ = 1;
742 }
743 uint32_t newSize = readBufferSize_;
744 while (readWant_ > newSize) {
745 newSize *= 2;
746 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000747
David Reiss89a12942010-10-06 17:10:52 +0000748 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
749 if (newBuffer == NULL) {
750 // nothing else to be done...
751 throw std::bad_alloc();
752 }
753 readBuffer_ = newBuffer;
754 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000755 }
756
Dave Watson792db4e2015-01-16 11:22:01 -0800757 readBufferPos_ = 4;
758 *((uint32_t*)readBuffer_) = htonl(readWant_ - 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000759
760 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000761 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000762 appState_ = APP_READ_REQUEST;
763
Mark Slee2f6404d2006-10-10 01:37:40 +0000764 return;
765
David Reiss01fe1532010-03-09 05:19:25 +0000766 case APP_CLOSE_CONNECTION:
767 server_->decrementActiveProcessors();
768 close();
769 return;
770
Mark Slee2f6404d2006-10-10 01:37:40 +0000771 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000772 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000773 assert(0);
774 }
775}
776
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000777void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000778 // Catch the do nothing case
779 if (eventFlags_ == eventFlags) {
780 return;
781 }
782
783 // Delete a previously existing event
Buğra Gedik36d1b0d2016-09-04 17:18:15 +0900784 if (eventFlags_ && event_del(&event_) == -1) {
785 GlobalOutput.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR);
786 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000787 }
788
789 // Update in memory structure
790 eventFlags_ = eventFlags;
791
Mark Slee402ee282007-08-23 01:43:20 +0000792 // Do not call event_set if there are no flags
793 if (!eventFlags_) {
794 return;
795 }
796
David Reiss01fe1532010-03-09 05:19:25 +0000797 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000798 * event_set:
799 *
800 * Prepares the event structure &event to be used in future calls to
801 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000802 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000803 *
804 * The events can be either EV_READ, EV_WRITE, or both, indicating
805 * that an application can read or write from the file respectively without
806 * blocking.
807 *
Mark Sleee02385b2007-06-09 01:21:16 +0000808 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000809 * the event and the type of event which will be one of: EV_TIMEOUT,
810 * EV_SIGNAL, EV_READ, EV_WRITE.
811 *
812 * The additional flag EV_PERSIST makes an event_add() persistent until
813 * event_del() has been called.
814 *
815 * Once initialized, the &event struct can be used repeatedly with
816 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000817 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000818 * when an ev structure has been added to libevent using event_add() the
819 * structure must persist until the event occurs (assuming EV_PERSIST
820 * is not set) or is removed using event_del(). You may not reuse the same
821 * ev structure for multiple monitored descriptors; each descriptor needs
822 * its own ev.
823 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100824 event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
Jake Farrellb0d95602011-12-06 01:17:26 +0000825 event_base_set(ioThread_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000826
827 // Add the event
828 if (event_add(&event_, 0) == -1) {
Buğra Gedik36d1b0d2016-09-04 17:18:15 +0900829 GlobalOutput.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR);
Mark Slee2f6404d2006-10-10 01:37:40 +0000830 }
831}
832
833/**
834 * Closes a connection
835 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000836void TNonblockingServer::TConnection::close() {
Changli Gaobf42d552017-03-20 14:29:07 +0800837 setIdle();
David Reiss105961d2010-10-06 17:10:17 +0000838
Roger Meier72957452013-06-29 00:28:50 +0200839 if (serverEventHandler_) {
David Reiss105961d2010-10-06 17:10:17 +0000840 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000841 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000842 ioThread_ = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000843
844 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000845 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000846
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000847 // close any factory produced transports
848 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000849 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000850
Roger Meier464a3a42014-07-07 21:48:28 +0200851 // release processor and handler
852 processor_.reset();
853
Mark Slee2f6404d2006-10-10 01:37:40 +0000854 // Give this object back to the server that owns it
855 server_->returnConnection(this);
856}
857
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100858void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000859 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000860 free(readBuffer_);
861 readBuffer_ = NULL;
862 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000863 }
David Reiss54bec5d2010-10-06 17:10:45 +0000864
865 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
866 // just start over
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400867 outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
David Reiss54bec5d2010-10-06 17:10:45 +0000868 largestWriteBufferSize_ = 0;
869 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000870}
871
David Reiss8ede8182010-09-02 15:26:28 +0000872TNonblockingServer::~TNonblockingServer() {
Roger Meier0c04fcc2013-03-22 19:52:08 +0100873 // Close any active connections (moves them to the idle connection stack)
874 while (activeConnections_.size()) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100875 activeConnections_.front()->close();
Roger Meier0c04fcc2013-03-22 19:52:08 +0100876 }
David Reiss8ede8182010-09-02 15:26:28 +0000877 // Clean up unused TConnection objects in connectionStack_
878 while (!connectionStack_.empty()) {
879 TConnection* connection = connectionStack_.top();
880 connectionStack_.pop();
881 delete connection;
882 }
Roger Meier0c04fcc2013-03-22 19:52:08 +0100883 // The TNonblockingIOThread objects have shared_ptrs to the Thread
884 // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
885 // objects (as runnable) so these objects will never deallocate without help.
886 while (!ioThreads_.empty()) {
James E. King, III82ae9572017-08-05 12:23:54 -0400887 stdcxx::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100888 ioThreads_.pop_back();
James E. King, III82ae9572017-08-05 12:23:54 -0400889 iot->setThread(stdcxx::shared_ptr<Thread>());
Roger Meier0c04fcc2013-03-22 19:52:08 +0100890 }
David Reiss8ede8182010-09-02 15:26:28 +0000891}
892
Mark Slee2f6404d2006-10-10 01:37:40 +0000893/**
894 * Creates a new connection either by reusing an object off the stack or
895 * by allocating a new one entirely
896 */
James E. King, III82ae9572017-08-05 12:23:54 -0400897TNonblockingServer::TConnection* TNonblockingServer::createConnection(stdcxx::shared_ptr<TSocket> socket) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000898 // Check the stack
Jake Farrellb0d95602011-12-06 01:17:26 +0000899 Guard g(connMutex_);
900
901 // pick an IO thread to handle this connection -- currently round robin
Jake Farrellb0d95602011-12-06 01:17:26 +0000902 assert(nextIOThread_ < ioThreads_.size());
903 int selectedThreadIdx = nextIOThread_;
Ben Craig64935232013-10-09 15:21:38 -0500904 nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size());
Jake Farrellb0d95602011-12-06 01:17:26 +0000905
906 TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
907
908 // Check the connection stack to see if we can re-use
909 TConnection* result = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000910 if (connectionStack_.empty()) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700911 result = new TConnection(socket, ioThread);
Jake Farrellb0d95602011-12-06 01:17:26 +0000912 ++numTConnections_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000913 } else {
Jake Farrellb0d95602011-12-06 01:17:26 +0000914 result = connectionStack_.top();
Mark Slee2f6404d2006-10-10 01:37:40 +0000915 connectionStack_.pop();
Divya Thaluru808d1432017-08-06 16:36:36 -0700916 result->setSocket(socket);
917 result->init(ioThread);
Mark Slee2f6404d2006-10-10 01:37:40 +0000918 }
Roger Meier0c04fcc2013-03-22 19:52:08 +0100919 activeConnections_.push_back(result);
Jake Farrellb0d95602011-12-06 01:17:26 +0000920 return result;
Mark Slee2f6404d2006-10-10 01:37:40 +0000921}
922
923/**
924 * Returns a connection to the stack
925 */
926void TNonblockingServer::returnConnection(TConnection* connection) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000927 Guard g(connMutex_);
928
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100929 activeConnections_.erase(std::remove(activeConnections_.begin(),
930 activeConnections_.end(),
931 connection),
932 activeConnections_.end());
Roger Meier0c04fcc2013-03-22 19:52:08 +0100933
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100934 if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000935 delete connection;
Jake Farrellb0d95602011-12-06 01:17:26 +0000936 --numTConnections_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000937 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000938 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000939 connectionStack_.push(connection);
940 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000941}
942
943/**
David Reissa79e4882008-03-05 07:51:47 +0000944 * Server socket had something happen. We accept all waiting client
945 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000946 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400947void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100948 (void)which;
David Reiss3bb5e052010-01-25 19:31:31 +0000949 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000950 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000951
Mark Slee2f6404d2006-10-10 01:37:40 +0000952 // Going to accept a new client socket
James E. King, III82ae9572017-08-05 12:23:54 -0400953 stdcxx::shared_ptr<TSocket> clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000954
Divya Thaluru808d1432017-08-06 16:36:36 -0700955 clientSocket = serverTransport_->accept();
956 if (clientSocket) {
David Reiss01fe1532010-03-09 05:19:25 +0000957 // If we're overloaded, take action here
958 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000959 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000960 nConnectionsDropped_++;
961 nTotalConnectionsDropped_++;
962 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700963 clientSocket->close();
David Reiss83b8fda2010-03-09 05:19:34 +0000964 return;
David Reiss01fe1532010-03-09 05:19:25 +0000965 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
966 if (!drainPendingTask()) {
967 // Nothing left to discard, so we drop connection instead.
Divya Thaluru808d1432017-08-06 16:36:36 -0700968 clientSocket->close();
David Reiss83b8fda2010-03-09 05:19:34 +0000969 return;
David Reiss01fe1532010-03-09 05:19:25 +0000970 }
971 }
972 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000973
Mark Slee2f6404d2006-10-10 01:37:40 +0000974 // Create a new TConnection for this client socket.
Divya Thaluru808d1432017-08-06 16:36:36 -0700975 TConnection* clientConnection = createConnection(clientSocket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000976
977 // Fail fast if we could not create a TConnection object
978 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000979 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Divya Thaluru808d1432017-08-06 16:36:36 -0700980 clientSocket->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000981 return;
982 }
983
Jake Farrellb0d95602011-12-06 01:17:26 +0000984 /*
985 * Either notify the ioThread that is assigned this connection to
986 * start processing, or if it is us, we'll just ask this
987 * connection to do its initial state change here.
988 *
989 * (We need to avoid writing to our own notification pipe, to
990 * avoid possible deadlocks if the pipe is full.)
991 *
992 * The IO thread #0 is the only one that handles these listen
993 * events, so unless the connection has been assigned to thread #0
994 * we know it's not on our thread.
995 */
996 if (clientConnection->getIOThreadNumber() == 0) {
997 clientConnection->transition();
998 } else {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100999 if (!clientConnection->notifyIOThread()) {
1000 GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
Changli Gao75386db2017-03-10 13:15:37 +08001001 clientConnection->close();
Jens Geyerfb05cf62014-12-04 21:49:07 +01001002 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001003 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001004 }
1005}
1006
1007/**
Mark Slee79b16942007-11-26 19:05:29 +00001008 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +00001009 */
Jake Farrellb0d95602011-12-06 01:17:26 +00001010void TNonblockingServer::createAndListenOnSocket() {
Divya Thaluru808d1432017-08-06 16:36:36 -07001011 serverTransport_->listen();
1012 serverSocket_ = serverTransport_->getSocketFD();
Mark Slee79b16942007-11-26 19:05:29 +00001013}
1014
Mark Slee79b16942007-11-26 19:05:29 +00001015
James E. King, III82ae9572017-08-05 12:23:54 -04001016void TNonblockingServer::setThreadManager(stdcxx::shared_ptr<ThreadManager> threadManager) {
David Reiss068f4162010-03-09 05:19:45 +00001017 threadManager_ = threadManager;
Roger Meier72957452013-06-29 00:28:50 +02001018 if (threadManager) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001019 threadManager->setExpireCallback(
1020 apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose,
1021 this,
1022 apache::thrift::stdcxx::placeholders::_1));
David Reiss068f4162010-03-09 05:19:45 +00001023 threadPoolProcessing_ = true;
1024 } else {
1025 threadPoolProcessing_ = false;
1026 }
1027}
1028
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001029bool TNonblockingServer::serverOverloaded() {
David Reiss01fe1532010-03-09 05:19:25 +00001030 size_t activeConnections = numTConnections_ - connectionStack_.size();
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001031 if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) {
David Reiss01fe1532010-03-09 05:19:25 +00001032 if (!overloaded_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001033 GlobalOutput.printf("TNonblockingServer: overload condition begun.");
David Reiss01fe1532010-03-09 05:19:25 +00001034 overloaded_ = true;
1035 }
1036 } else {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001037 if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_)
1038 && (activeConnections <= overloadHysteresis_ * maxConnections_)) {
1039 GlobalOutput.printf(
1040 "TNonblockingServer: overload ended; "
1041 "%u dropped (%llu total)",
1042 nConnectionsDropped_,
1043 nTotalConnectionsDropped_);
David Reiss01fe1532010-03-09 05:19:25 +00001044 nConnectionsDropped_ = 0;
1045 overloaded_ = false;
1046 }
1047 }
1048
1049 return overloaded_;
1050}
1051
1052bool TNonblockingServer::drainPendingTask() {
1053 if (threadManager_) {
James E. King, III82ae9572017-08-05 12:23:54 -04001054 stdcxx::shared_ptr<Runnable> task = threadManager_->removeNextPending();
David Reiss01fe1532010-03-09 05:19:25 +00001055 if (task) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001056 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1057 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
David Reiss01fe1532010-03-09 05:19:25 +00001058 connection->forceClose();
1059 return true;
1060 }
1061 }
1062 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001063}
1064
James E. King, III82ae9572017-08-05 12:23:54 -04001065void TNonblockingServer::expireClose(stdcxx::shared_ptr<Runnable> task) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001066 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1067 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
David Reiss068f4162010-03-09 05:19:45 +00001068 connection->forceClose();
1069}
1070
Bugra Gedik8bcb7ac2018-01-21 09:43:49 -08001071void TNonblockingServer::stop() {
Jake Farrellb0d95602011-12-06 01:17:26 +00001072 // Breaks the event loop in all threads so that they end ASAP.
Roger Meierd0cdecf2011-12-08 19:34:01 +00001073 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001074 ioThreads_[i]->stop();
1075 }
1076}
1077
Roger Meier6f2a5032013-07-08 23:35:25 +02001078void TNonblockingServer::registerEvents(event_base* user_event_base) {
1079 userEventBase_ = user_event_base;
1080
Jake Farrellb0d95602011-12-06 01:17:26 +00001081 // init listen socket
Roger Meiere802aa42013-07-19 21:10:54 +02001082 if (serverSocket_ == THRIFT_INVALID_SOCKET)
Roger Meier6f2a5032013-07-08 23:35:25 +02001083 createAndListenOnSocket();
Mark Slee79b16942007-11-26 19:05:29 +00001084
Jake Farrellb0d95602011-12-06 01:17:26 +00001085 // set up the IO threads
1086 assert(ioThreads_.empty());
1087 if (!numIOThreads_) {
1088 numIOThreads_ = DEFAULT_IO_THREADS;
David Reiss01fe1532010-03-09 05:19:25 +00001089 }
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +09001090 // User-provided event-base doesn't works for multi-threaded servers
1091 assert(numIOThreads_ == 1 || !userEventBase_);
David Reiss01fe1532010-03-09 05:19:25 +00001092
Roger Meierd0cdecf2011-12-08 19:34:01 +00001093 for (uint32_t id = 0; id < numIOThreads_; ++id) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001094 // the first IO thread also does the listening on server socket
Roger Meier0be9ffa2013-07-19 21:10:01 +02001095 THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
Mark Slee2f6404d2006-10-10 01:37:40 +00001096
Jake Farrellb0d95602011-12-06 01:17:26 +00001097 shared_ptr<TNonblockingIOThread> thread(
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001098 new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
Jake Farrellb0d95602011-12-06 01:17:26 +00001099 ioThreads_.push_back(thread);
1100 }
1101
1102 // Notify handler of the preServe event
Roger Meier72957452013-06-29 00:28:50 +02001103 if (eventHandler_) {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001104 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001105 }
1106
Jake Farrellb0d95602011-12-06 01:17:26 +00001107 // Start all of our helper IO threads. Note that the threads run forever,
1108 // only terminating if stop() is called.
1109 assert(ioThreads_.size() == numIOThreads_);
1110 assert(ioThreads_.size() > 0);
1111
Divya Thaluru808d1432017-08-06 16:36:36 -07001112 GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.",
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001113 ioThreads_.size());
Jake Farrellb0d95602011-12-06 01:17:26 +00001114
1115 // Launch all the secondary IO threads in separate threads
1116 if (ioThreads_.size() > 1) {
Roger Meier12d70532011-12-14 23:35:28 +00001117 ioThreadFactory_.reset(new PlatformThreadFactory(
Nobuaki Sukegawa28256642014-12-16 03:24:37 +09001118#if !USE_BOOST_THREAD && !USE_STD_THREAD
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001119 PlatformThreadFactory::OTHER, // scheduler
1120 PlatformThreadFactory::NORMAL, // priority
1121 1, // stack size (MB)
Roger Meier12d70532011-12-14 23:35:28 +00001122#endif
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001123 false // detached
1124 ));
Jake Farrellb0d95602011-12-06 01:17:26 +00001125
1126 assert(ioThreadFactory_.get());
1127
1128 // intentionally starting at thread 1, not 0
Roger Meierd0cdecf2011-12-08 19:34:01 +00001129 for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001130 shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1131 ioThreads_[i]->setThread(thread);
1132 thread->start();
1133 }
1134 }
1135
Roger Meier6f2a5032013-07-08 23:35:25 +02001136 // Register the events for the primary (listener) IO thread
1137 ioThreads_[0]->registerEvents();
1138}
1139
1140/**
1141 * Main workhorse function, starts up the server listening on a port and
1142 * loops over the libevent handler.
1143 */
1144void TNonblockingServer::serve() {
1145
Konrad Grochowski1f6e3802015-05-18 18:10:06 +02001146 if (ioThreads_.empty())
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +09001147 registerEvents(NULL);
Roger Meier6f2a5032013-07-08 23:35:25 +02001148
Jake Farrellb0d95602011-12-06 01:17:26 +00001149 // Run the primary (listener) IO thread loop in our main thread; this will
1150 // only return when the server is shutting down.
1151 ioThreads_[0]->run();
1152
1153 // Ensure all threads are finished before exiting serve()
Roger Meierd0cdecf2011-12-08 19:34:01 +00001154 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001155 ioThreads_[i]->join();
1156 GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1157 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001158}
1159
Jake Farrellb0d95602011-12-06 01:17:26 +00001160TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1161 int number,
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001162 THRIFT_SOCKET listenSocket,
Jake Farrellb0d95602011-12-06 01:17:26 +00001163 bool useHighPriority)
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001164 : server_(server),
1165 number_(number),
1166 listenSocket_(listenSocket),
1167 useHighPriority_(useHighPriority),
1168 eventBase_(NULL),
1169 ownEventBase_(false) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001170 notificationPipeFDs_[0] = -1;
1171 notificationPipeFDs_[1] = -1;
1172}
1173
1174TNonblockingIOThread::~TNonblockingIOThread() {
1175 // make sure our associated thread is fully finished
1176 join();
1177
Roger Meier6f2a5032013-07-08 23:35:25 +02001178 if (eventBase_ && ownEventBase_) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001179 event_base_free(eventBase_);
Roger Meier6f2a5032013-07-08 23:35:25 +02001180 ownEventBase_ = false;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001181 }
1182
gzshi41945622017-01-06 10:47:03 +08001183 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001184 if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001185 GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001186 }
Roger Meier0be9ffa2013-07-19 21:10:01 +02001187 listenSocket_ = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +00001188 }
1189
1190 for (int i = 0; i < 2; ++i) {
1191 if (notificationPipeFDs_[i] >= 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001192 if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFDs_[i])) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001193 GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001194 THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001195 }
Roger Meier0be9ffa2013-07-19 21:10:01 +02001196 notificationPipeFDs_[i] = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +00001197 }
1198 }
1199}
1200
1201void TNonblockingIOThread::createNotificationPipe() {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001202 if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
Roger Meier12d70532011-12-14 23:35:28 +00001203 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
Jake Farrellb0d95602011-12-06 01:17:26 +00001204 throw TException("can't create notification pipe");
1205 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001206 if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
1207 || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001208 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1209 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
1210 throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
Jake Farrellb0d95602011-12-06 01:17:26 +00001211 }
1212 for (int i = 0; i < 2; ++i) {
Roger Meier12d70532011-12-14 23:35:28 +00001213#if LIBEVENT_VERSION_NUMBER < 0x02000000
1214 int flags;
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001215 if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0
1216 || THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
Roger Meier12d70532011-12-14 23:35:28 +00001217#else
1218 if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
1219#endif
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001220 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1221 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001222 throw TException(
1223 "TNonblockingServer::createNotificationPipe() "
1224 "FD_CLOEXEC");
Jake Farrellb0d95602011-12-06 01:17:26 +00001225 }
1226 }
1227}
1228
1229/**
1230 * Register the core libevent events onto the proper base.
1231 */
1232void TNonblockingIOThread::registerEvents() {
Roger Meier6f2a5032013-07-08 23:35:25 +02001233 threadId_ = Thread::get_current();
1234
1235 assert(eventBase_ == 0);
1236 eventBase_ = getServer()->getUserEventBase();
1237 if (eventBase_ == NULL) {
1238 eventBase_ = event_base_new();
1239 ownEventBase_ = true;
1240 }
1241
1242 // Print some libevent stats
1243 if (number_ == 0) {
1244 GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001245 event_get_version(),
1246 event_base_get_method(eventBase_));
Roger Meier6f2a5032013-07-08 23:35:25 +02001247 }
1248
gzshi41945622017-01-06 10:47:03 +08001249 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001250 // Register the server event
1251 event_set(&serverEvent_,
1252 listenSocket_,
1253 EV_READ | EV_PERSIST,
1254 TNonblockingIOThread::listenHandler,
1255 server_);
1256 event_base_set(eventBase_, &serverEvent_);
1257
1258 // Add the event and start up the server
1259 if (-1 == event_add(&serverEvent_, 0)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001260 throw TException(
1261 "TNonblockingServer::serve(): "
1262 "event_add() failed on server listen event");
Jake Farrellb0d95602011-12-06 01:17:26 +00001263 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001264 GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001265 }
1266
1267 createNotificationPipe();
1268
1269 // Create an event to be notified when a task finishes
1270 event_set(&notificationEvent_,
1271 getNotificationRecvFD(),
1272 EV_READ | EV_PERSIST,
1273 TNonblockingIOThread::notifyHandler,
1274 this);
1275
1276 // Attach to the base
1277 event_base_set(eventBase_, &notificationEvent_);
1278
1279 // Add the event and start up the server
1280 if (-1 == event_add(&notificationEvent_, 0)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001281 throw TException(
1282 "TNonblockingServer::serve(): "
1283 "event_add() failed on task-done notification event");
Jake Farrellb0d95602011-12-06 01:17:26 +00001284 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001285 GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001286}
1287
1288bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001289 THRIFT_SOCKET fd = getNotificationSendFD();
Jake Farrellb0d95602011-12-06 01:17:26 +00001290 if (fd < 0) {
1291 return false;
1292 }
1293
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001294 fd_set wfds, efds;
tpcwangf98d59f2016-03-23 16:18:52 -07001295 long ret = -1;
1296 long kSize = sizeof(conn);
Nobuaki Sukegawa8cc91752016-05-15 00:24:41 +09001297 const char* pos = reinterpret_cast<const char*>(&conn);
abadcafe38772c92015-04-03 22:23:04 +08001298
1299 while (kSize > 0) {
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001300 FD_ZERO(&wfds);
1301 FD_ZERO(&efds);
1302 FD_SET(fd, &wfds);
1303 FD_SET(fd, &efds);
tpcwangf98d59f2016-03-23 16:18:52 -07001304 ret = select(static_cast<int>(fd + 1), NULL, &wfds, &efds, NULL);
abadcafe38772c92015-04-03 22:23:04 +08001305 if (ret < 0) {
1306 return false;
1307 } else if (ret == 0) {
1308 continue;
1309 }
1310
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001311 if (FD_ISSET(fd, &efds)) {
1312 ::THRIFT_CLOSESOCKET(fd);
abadcafe38772c92015-04-03 22:23:04 +08001313 return false;
1314 }
1315
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001316 if (FD_ISSET(fd, &wfds)) {
abadcafe38772c92015-04-03 22:23:04 +08001317 ret = send(fd, pos, kSize, 0);
1318 if (ret < 0) {
1319 if (errno == EAGAIN) {
1320 continue;
1321 }
1322
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001323 ::THRIFT_CLOSESOCKET(fd);
abadcafe38772c92015-04-03 22:23:04 +08001324 return false;
1325 }
1326
1327 kSize -= ret;
1328 pos += ret;
1329 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001330 }
1331
1332 return true;
1333}
1334
1335/* static */
Roger Meier12d70532011-12-14 23:35:28 +00001336void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001337 TNonblockingIOThread* ioThread = (TNonblockingIOThread*)v;
Jake Farrellb0d95602011-12-06 01:17:26 +00001338 assert(ioThread);
Roger Meierd0cdecf2011-12-08 19:34:01 +00001339 (void)which;
Jake Farrellb0d95602011-12-06 01:17:26 +00001340
1341 while (true) {
1342 TNonblockingServer::TConnection* connection = 0;
1343 const int kSize = sizeof(connection);
Ben Craig64935232013-10-09 15:21:38 -05001344 long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
Jake Farrellb0d95602011-12-06 01:17:26 +00001345 if (nBytes == kSize) {
1346 if (connection == NULL) {
1347 // this is the command to stop our thread, exit the handler!
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001348 ioThread->breakLoop(false);
Jake Farrellb0d95602011-12-06 01:17:26 +00001349 return;
1350 }
1351 connection->transition();
1352 } else if (nBytes > 0) {
1353 // throw away these bytes and hope that next time we get a solid read
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001354 GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
Jake Farrellb0d95602011-12-06 01:17:26 +00001355 ioThread->breakLoop(true);
1356 return;
1357 } else if (nBytes == 0) {
1358 GlobalOutput.printf("notifyHandler: Notify socket closed!");
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001359 ioThread->breakLoop(false);
Jake Farrellb0d95602011-12-06 01:17:26 +00001360 // exit the loop
1361 break;
1362 } else { // nBytes < 0
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001363 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
1364 && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
1365 GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
1366 ioThread->breakLoop(true);
1367 return;
Jake Farrellb0d95602011-12-06 01:17:26 +00001368 }
1369 // exit the loop
1370 break;
1371 }
1372 }
1373}
1374
1375void TNonblockingIOThread::breakLoop(bool error) {
1376 if (error) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001377 GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001378 // TODO: figure out something better to do here, but for now kill the
1379 // whole process.
1380 GlobalOutput.printf("TNonblockingServer: aborting process.");
1381 ::abort();
1382 }
1383
Jake Farrellb0d95602011-12-06 01:17:26 +00001384 // If we're running in the same thread, we can't use the notify(0)
1385 // mechanism to stop the thread, but happily if we're running in the
1386 // same thread, this means the thread can't be blocking in the event
1387 // loop either.
Roger Meier12d70532011-12-14 23:35:28 +00001388 if (!Thread::is_current(threadId_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001389 notify(NULL);
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001390 } else {
1391 // cause the loop to stop ASAP - even if it has things to do in it
1392 event_base_loopbreak(eventBase_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001393 }
1394}
1395
1396void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
Roger Meier12d70532011-12-14 23:35:28 +00001397#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +00001398 // Start out with a standard, low-priority setup for the sched params.
1399 struct sched_param sp;
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001400 bzero((void*)&sp, sizeof(sp));
Jake Farrellb0d95602011-12-06 01:17:26 +00001401 int policy = SCHED_OTHER;
1402
1403 // If desired, set up high-priority sched params structure.
1404 if (value) {
1405 // FIFO scheduler, ranked above default SCHED_OTHER queue
1406 policy = SCHED_FIFO;
1407 // The priority only compares us to other SCHED_FIFO threads, so we
1408 // just pick a random priority halfway between min & max.
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001409 const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;
Jake Farrellb0d95602011-12-06 01:17:26 +00001410
1411 sp.sched_priority = priority;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001412 }
1413
Jake Farrellb0d95602011-12-06 01:17:26 +00001414 // Actually set the sched params for the current thread.
1415 if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001416 GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001417 } else {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001418 GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001419 }
Roger Meierd051ca02013-08-15 01:35:11 +02001420#else
1421 THRIFT_UNUSED_VARIABLE(value);
Roger Meier12d70532011-12-14 23:35:28 +00001422#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001423}
Bryan Duxbury76c43682011-08-24 21:26:48 +00001424
Jake Farrellb0d95602011-12-06 01:17:26 +00001425void TNonblockingIOThread::run() {
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001426 if (eventBase_ == NULL) {
Roger Meier6f2a5032013-07-08 23:35:25 +02001427 registerEvents();
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001428 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001429 if (useHighPriority_) {
1430 setCurrentThreadHighPriority(true);
1431 }
1432
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001433 if (eventBase_ != NULL)
1434 {
1435 GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
1436 // Run libevent engine, never returns, invokes calls to eventHandler
1437 event_base_loop(eventBase_, 0);
Jake Farrellb0d95602011-12-06 01:17:26 +00001438
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001439 if (useHighPriority_) {
1440 setCurrentThreadHighPriority(false);
1441 }
1442
1443 // cleans up our registered events
1444 cleanupEvents();
Jake Farrellb0d95602011-12-06 01:17:26 +00001445 }
1446
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001447 GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001448}
1449
1450void TNonblockingIOThread::cleanupEvents() {
1451 // stop the listen socket, if any
gzshi41945622017-01-06 10:47:03 +08001452 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001453 if (event_del(&serverEvent_) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001454 GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001455 }
1456 }
1457
1458 event_del(&notificationEvent_);
1459}
1460
Jake Farrellb0d95602011-12-06 01:17:26 +00001461void TNonblockingIOThread::stop() {
1462 // This should cause the thread to fall out of its event loop ASAP.
1463 breakLoop(false);
1464}
1465
1466void TNonblockingIOThread::join() {
1467 // If this was a thread created by a factory (not the thread that called
1468 // serve()), we join() it to make sure we shut down fully.
1469 if (thread_) {
1470 try {
1471 // Note that it is safe to both join() ourselves twice, as well as join
1472 // the current thread as the pthread implementation checks for deadlock.
1473 thread_->join();
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001474 } catch (...) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001475 // swallow everything
1476 }
1477 }
Bryan Duxbury76c43682011-08-24 21:26:48 +00001478}
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001479}
1480}
1481} // apache::thrift::server