blob: d53535bdadcfbe232b663b2c4a3a80c4b1c8a551 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Konrad Grochowski9be4e682013-06-22 22:03:31 +020020#include <thrift/thrift-config.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000021
Roger Meier4285ba22013-06-10 21:17:23 +020022#include <thrift/server/TNonblockingServer.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000023#include <thrift/concurrency/Exception.h>
24#include <thrift/transport/TSocket.h>
cyyca8af9b2019-01-11 22:13:12 +080025#include <thrift/concurrency/ThreadFactory.h>
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040026#include <thrift/transport/PlatformSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000027
James E. King, III82ae9572017-08-05 12:23:54 -040028#include <algorithm>
Mark Sleee02385b2007-06-09 01:21:16 +000029#include <iostream>
Lei Feiweib5ebcd12015-04-04 22:12:07 +080030
st0ke961fa702018-10-12 18:37:40 +070031#ifdef HAVE_POLL_H
32#include <poll.h>
33#elif HAVE_SYS_POLL_H
34#include <sys/poll.h>
35#elif HAVE_SYS_SELECT_H
Lei Feiweib5ebcd12015-04-04 22:12:07 +080036#include <sys/select.h>
37#endif
Roger Meier30aae0c2011-07-08 12:23:31 +000038
39#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000041#endif
42
43#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000044#include <netinet/in.h>
45#include <netinet/tcp.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000046#endif
47
48#ifdef HAVE_ARPA_INET_H
Bryan Duxbury76c43682011-08-24 21:26:48 +000049#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000050#endif
51
52#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000053#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000054#endif
55
Roger Meier2fa9c312011-09-05 19:15:53 +000056#ifdef HAVE_FCNTL_H
Mark Slee2f6404d2006-10-10 01:37:40 +000057#include <fcntl.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000058#endif
59
Mark Slee2f6404d2006-10-10 01:37:40 +000060#include <assert.h>
Roger Meier12d70532011-12-14 23:35:28 +000061
62#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +000063#include <sched.h>
Roger Meier12d70532011-12-14 23:35:28 +000064#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000065
David Reiss9b903442009-10-21 05:51:28 +000066#ifndef AF_LOCAL
67#define AF_LOCAL AF_UNIX
68#endif
69
James E. King, III7edc8fa2017-01-20 10:11:41 -050070#ifdef HAVE_INTTYPES_H
71#include <inttypes.h>
Roger Meier12d70532011-12-14 23:35:28 +000072#endif
73
James E. King, III7edc8fa2017-01-20 10:11:41 -050074#ifdef HAVE_STDINT_H
75#include <stdint.h>
Antonio Di Monaco796667b2016-01-04 23:05:19 +010076#endif
77
Konrad Grochowski16a23a62014-11-13 15:33:38 +010078namespace apache {
79namespace thrift {
80namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000081
T Jake Lucianib5e62212009-01-31 22:36:20 +000082using namespace apache::thrift::protocol;
83using namespace apache::thrift::transport;
84using namespace apache::thrift::concurrency;
David Reiss1c20c872010-03-09 05:20:14 +000085using apache::thrift::transport::TSocket;
86using apache::thrift::transport::TTransportException;
cyy316723a2019-01-05 16:35:14 +080087using std::shared_ptr;
Mark Sleee02385b2007-06-09 01:21:16 +000088
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000089/// Three states for sockets: recv frame size, recv data, and send mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +010090enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000091
92/**
93 * Five states for the nonblocking server:
94 * 1) initialize
95 * 2) read 4 byte frame size
96 * 3) read frame of data
97 * 4) send back data (if any)
98 * 5) force immediate connection close
99 */
100enum TAppState {
101 APP_INIT,
102 APP_READ_FRAME_SIZE,
103 APP_READ_REQUEST,
104 APP_WAIT_TASK,
105 APP_SEND_RESULT,
106 APP_CLOSE_CONNECTION
107};
108
109/**
110 * Represents a connection that is handled via libevent. This connection
111 * essentially encapsulates a socket that has some associated libevent state.
112 */
113class TNonblockingServer::TConnection {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100114private:
Jake Farrellb0d95602011-12-06 01:17:26 +0000115 /// Server IO Thread handling this connection
116 TNonblockingIOThread* ioThread_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000117
118 /// Server handle
119 TNonblockingServer* server_;
120
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000121 /// TProcessor
cyy316723a2019-01-05 16:35:14 +0800122 std::shared_ptr<TProcessor> processor_;
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000123
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000124 /// Object wrapping network socket
cyy316723a2019-01-05 16:35:14 +0800125 std::shared_ptr<TSocket> tSocket_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000126
127 /// Libevent object
128 struct event event_;
129
130 /// Libevent flags
131 short eventFlags_;
132
133 /// Socket mode
134 TSocketState socketState_;
135
136 /// Application state
137 TAppState appState_;
138
139 /// How much data needed to read
140 uint32_t readWant_;
141
142 /// Where in the read buffer are we
143 uint32_t readBufferPos_;
144
145 /// Read buffer
146 uint8_t* readBuffer_;
147
148 /// Read buffer size
149 uint32_t readBufferSize_;
150
151 /// Write buffer
152 uint8_t* writeBuffer_;
153
154 /// Write buffer size
155 uint32_t writeBufferSize_;
156
157 /// How far through writing are we?
158 uint32_t writeBufferPos_;
159
160 /// Largest size of write buffer seen since buffer was constructed
161 size_t largestWriteBufferSize_;
162
163 /// Count of the number of calls for use with getResizeBufferEveryN().
164 int32_t callsForResize_;
165
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000166 /// Transport to read from
cyy316723a2019-01-05 16:35:14 +0800167 std::shared_ptr<TMemoryBuffer> inputTransport_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000168
169 /// Transport that processor writes to
cyy316723a2019-01-05 16:35:14 +0800170 std::shared_ptr<TMemoryBuffer> outputTransport_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000171
172 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
cyy316723a2019-01-05 16:35:14 +0800173 std::shared_ptr<TTransport> factoryInputTransport_;
174 std::shared_ptr<TTransport> factoryOutputTransport_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000175
176 /// Protocol decoder
cyy316723a2019-01-05 16:35:14 +0800177 std::shared_ptr<TProtocol> inputProtocol_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000178
179 /// Protocol encoder
cyy316723a2019-01-05 16:35:14 +0800180 std::shared_ptr<TProtocol> outputProtocol_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000181
182 /// Server event handler, if any
cyy316723a2019-01-05 16:35:14 +0800183 std::shared_ptr<TServerEventHandler> serverEventHandler_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000184
185 /// Thrift call context, if any
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100186 void* connectionContext_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000187
188 /// Go into read mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100189 void setRead() { setFlags(EV_READ | EV_PERSIST); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000190
191 /// Go into write mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100192 void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000193
194 /// Set socket idle
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100195 void setIdle() { setFlags(0); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000196
197 /**
198 * Set event flags for this connection.
199 *
200 * @param eventFlags flags we pass to libevent for the connection.
201 */
202 void setFlags(short eventFlags);
203
204 /**
205 * Libevent handler called (via our static wrapper) when the connection
206 * socket had something happen. Rather than use the flags libevent passed,
207 * we use the connection state to determine whether we need to read or
208 * write the socket.
209 */
210 void workSocket();
211
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100212public:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000213 class Task;
214
215 /// Constructor
cyy316723a2019-01-05 16:35:14 +0800216 TConnection(std::shared_ptr<TSocket> socket,
Divya Thaluru808d1432017-08-06 16:36:36 -0700217 TNonblockingIOThread* ioThread) {
Sebastian Zenker042580f2019-01-29 15:48:12 +0100218 readBuffer_ = nullptr;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000219 readBufferSize_ = 0;
220
Jake Farrellb0d95602011-12-06 01:17:26 +0000221 ioThread_ = ioThread;
222 server_ = ioThread->getServer();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000223
Jake Farrellb0d95602011-12-06 01:17:26 +0000224 // Allocate input and output transports these only need to be allocated
225 // once per TConnection (they don't need to be reallocated on init() call)
226 inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400227 outputTransport_.reset(
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100228 new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
Divya Thaluru808d1432017-08-06 16:36:36 -0700229
230 tSocket_ = socket;
231
232 init(ioThread);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000233 }
234
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100235 ~TConnection() { std::free(readBuffer_); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000236
Roger Meier0c04fcc2013-03-22 19:52:08 +0100237 /// Close this connection and free or reset its resources.
238 void close();
239
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100240 /**
241 * Check buffers against any size limits and shrink it if exceeded.
242 *
243 * @param readLimit we reduce read buffer size to this (if nonzero).
244 * @param writeLimit if nonzero and write buffer is larger, replace it.
245 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000246 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
247
248 /// Initialize
Divya Thaluru808d1432017-08-06 16:36:36 -0700249 void init(TNonblockingIOThread* ioThread);
250
251 /// set socket for connection
cyy316723a2019-01-05 16:35:14 +0800252 void setSocket(std::shared_ptr<TSocket> socket);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000253
254 /**
255 * This is called when the application transitions from one state into
256 * another. This means that it has finished writing the data that it needed
257 * to, or finished receiving the data that it needed to.
258 */
259 void transition();
260
261 /**
262 * C-callable event handler for connection events. Provides a callback
263 * that libevent can understand which invokes connection_->workSocket().
264 *
265 * @param fd the descriptor the event occurred on.
266 * @param which the flags associated with the event.
267 * @param v void* callback arg where we placed TConnection's "this".
268 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000269 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Konrad Grochowskib7af66e2014-07-08 19:22:44 +0200270 assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000271 ((TConnection*)v)->workSocket();
272 }
273
274 /**
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000275 * Notification to server that processing has ended on this request.
276 * Can be called either when processing is completed or when a waiting
277 * task has been preemptively terminated (on overload).
278 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000279 * Don't call this from the IO thread itself.
280 *
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400281 * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000282 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100283 bool notifyIOThread() { return ioThread_->notify(this); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000284
Jake Farrellb0d95602011-12-06 01:17:26 +0000285 /*
286 * Returns the number of this connection's currently assigned IO
287 * thread.
288 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100289 int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000290
291 /// Force connection shutdown for this connection.
292 void forceClose() {
293 appState_ = APP_CLOSE_CONNECTION;
Jake Farrellb0d95602011-12-06 01:17:26 +0000294 if (!notifyIOThread()) {
Changli Gao257dcef2017-04-06 00:42:01 +0800295 server_->decrementActiveProcessors();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100296 close();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000297 throw TException("TConnection::forceClose: failed write on notify pipe");
298 }
299 }
300
301 /// return the server this connection was initialized for.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100302 TNonblockingServer* getServer() const { return server_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000303
304 /// get state of connection.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100305 TAppState getState() const { return appState_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000306
307 /// return the TSocket transport wrapping this network connection
cyy316723a2019-01-05 16:35:14 +0800308 std::shared_ptr<TSocket> getTSocket() const { return tSocket_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000309
310 /// return the server event handler if any
cyy316723a2019-01-05 16:35:14 +0800311 std::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000312
313 /// return the Thrift connection context if any
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100314 void* getConnectionContext() { return connectionContext_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000315};
316
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100317class TNonblockingServer::TConnection::Task : public Runnable {
318public:
cyy316723a2019-01-05 16:35:14 +0800319 Task(std::shared_ptr<TProcessor> processor,
320 std::shared_ptr<TProtocol> input,
321 std::shared_ptr<TProtocol> output,
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100322 TConnection* connection)
323 : processor_(processor),
324 input_(input),
325 output_(output),
326 connection_(connection),
327 serverEventHandler_(connection_->getServerEventHandler()),
328 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000329
Sebastian Zenker042580f2019-01-29 15:48:12 +0100330 void run() override {
Mark Sleee02385b2007-06-09 01:21:16 +0000331 try {
David Reiss105961d2010-10-06 17:10:17 +0000332 for (;;) {
Roger Meier72957452013-06-29 00:28:50 +0200333 if (serverEventHandler_) {
David Reiss105961d2010-10-06 17:10:17 +0000334 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
335 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100336 if (!processor_->process(input_, output_, connectionContext_)
337 || !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000338 break;
339 }
340 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000341 } catch (const TTransportException& ttx) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000342 GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
James E. King, III82ae9572017-08-05 12:23:54 -0400343 } catch (const std::bad_alloc&) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000344 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
Henrique Mendonca962b3532012-09-20 13:19:55 +0000345 exit(1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000346 } catch (const std::exception& x) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000347 GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100348 typeid(x).name(),
349 x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000350 } catch (...) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100351 GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
Mark Sleee02385b2007-06-09 01:21:16 +0000352 }
Mark Slee79b16942007-11-26 19:05:29 +0000353
David Reiss01fe1532010-03-09 05:19:25 +0000354 // Signal completion back to the libevent thread via a pipe
Jake Farrellb0d95602011-12-06 01:17:26 +0000355 if (!connection_->notifyIOThread()) {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100356 GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
Changli Gao257dcef2017-04-06 00:42:01 +0800357 connection_->server_->decrementActiveProcessors();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100358 connection_->close();
David Reiss01fe1532010-03-09 05:19:25 +0000359 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000360 }
David Reiss01fe1532010-03-09 05:19:25 +0000361 }
362
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100363 TConnection* getTConnection() { return connection_; }
Mark Sleee02385b2007-06-09 01:21:16 +0000364
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100365private:
cyy316723a2019-01-05 16:35:14 +0800366 std::shared_ptr<TProcessor> processor_;
367 std::shared_ptr<TProtocol> input_;
368 std::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000369 TConnection* connection_;
cyy316723a2019-01-05 16:35:14 +0800370 std::shared_ptr<TServerEventHandler> serverEventHandler_;
David Reiss105961d2010-10-06 17:10:17 +0000371 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000372};
Mark Slee5ea15f92007-03-05 22:55:59 +0000373
Divya Thaluru808d1432017-08-06 16:36:36 -0700374void TNonblockingServer::TConnection::init(TNonblockingIOThread* ioThread) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000375 ioThread_ = ioThread;
376 server_ = ioThread->getServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000377 appState_ = APP_INIT;
378 eventFlags_ = 0;
379
380 readBufferPos_ = 0;
381 readWant_ = 0;
382
Sebastian Zenker042580f2019-01-29 15:48:12 +0100383 writeBuffer_ = nullptr;
Mark Slee2f6404d2006-10-10 01:37:40 +0000384 writeBufferSize_ = 0;
385 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000386 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000387
David Reiss89a12942010-10-06 17:10:52 +0000388 socketState_ = SOCKET_RECV_FRAMING;
David Reiss54bec5d2010-10-06 17:10:45 +0000389 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000390
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000391 // get input/transports
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100392 factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
393 factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000394
395 // Create protocol
Dave Watson792db4e2015-01-16 11:22:01 -0800396 if (server_->getHeaderTransport()) {
397 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_,
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +0100398 factoryOutputTransport_);
Dave Watson792db4e2015-01-16 11:22:01 -0800399 outputProtocol_ = inputProtocol_;
400 } else {
401 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
402 outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
403 }
David Reiss105961d2010-10-06 17:10:17 +0000404
405 // Set up for any server event handler
406 serverEventHandler_ = server_->getEventHandler();
Roger Meier72957452013-06-29 00:28:50 +0200407 if (serverEventHandler_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100408 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
David Reiss105961d2010-10-06 17:10:17 +0000409 } else {
Sebastian Zenker042580f2019-01-29 15:48:12 +0100410 connectionContext_ = nullptr;
David Reiss105961d2010-10-06 17:10:17 +0000411 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000412
413 // Get the processor
Jake Farrellb0d95602011-12-06 01:17:26 +0000414 processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000415}
416
cyy316723a2019-01-05 16:35:14 +0800417void TNonblockingServer::TConnection::setSocket(std::shared_ptr<TSocket> socket) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700418 tSocket_ = socket;
419}
420
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000421void TNonblockingServer::TConnection::workSocket() {
Tamas Kovacsb941b112022-02-11 19:31:40 +0800422 while (true) {
423 int got = 0, left = 0, sent = 0;
424 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000425
Tamas Kovacsb941b112022-02-11 19:31:40 +0800426 switch (socketState_) {
427 case SOCKET_RECV_FRAMING:
428 union {
429 uint8_t buf[sizeof(uint32_t)];
430 uint32_t size;
431 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000432
Tamas Kovacsb941b112022-02-11 19:31:40 +0800433 // if we've already received some bytes we kept them here
434 framing.size = readWant_;
435 // determine size of this frame
436 try {
437 // Read from the socket
438 fetch = tSocket_->read(&framing.buf[readBufferPos_],
439 uint32_t(sizeof(framing.size) - readBufferPos_));
440 if (fetch == 0) {
441 // Whenever we get here it means a remote disconnect
442 close();
443 return;
444 }
445 readBufferPos_ += fetch;
446 } catch (TTransportException& te) {
447 //In Nonblocking SSLSocket some operations need to be retried again.
448 //Current approach is parsing exception message, but a better solution needs to be investigated.
449 if(!strstr(te.what(), "retry")) {
450 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
451 close();
452
453 return;
454 }
455 }
456
457 if (readBufferPos_ < sizeof(framing.size)) {
458 // more needed before frame size is known -- save what we have so far
459 readWant_ = framing.size;
460 return;
461 }
462
463 readWant_ = ntohl(framing.size);
464 if (readWant_ > server_->getMaxFrameSize()) {
465 // Don't allow giant frame sizes. This prevents bad clients from
466 // causing us to try and allocate a giant buffer.
467 GlobalOutput.printf(
468 "TNonblockingServer: frame size too large "
469 "(%" PRIu32 " > %" PRIu64
470 ") from client %s. "
471 "Remote side not using TFramedTransport?",
472 readWant_,
473 (uint64_t)server_->getMaxFrameSize(),
474 tSocket_->getSocketInfo().c_str());
Mark Slee2f6404d2006-10-10 01:37:40 +0000475 close();
476 return;
477 }
Tamas Kovacsb941b112022-02-11 19:31:40 +0800478 // size known; now get the rest of the frame
479 transition();
480
481 // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
482 // regular sockets, because if there is more data, libevent will fire the event handler registered for read
483 // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
484 // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
485 // that case, not trying another processing cycle here would result in a hang as we will never get to work the socket,
486 // despite having more data.
487 if (tSocket_->hasPendingDataToRead())
488 {
489 continue;
490 }
491
492 return;
493
494 case SOCKET_RECV:
495 // It is an error to be in this state if we already have all the data
496 if (!(readBufferPos_ < readWant_)) {
497 GlobalOutput.printf("TNonblockingServer: frame size too short");
Divya Thaluru808d1432017-08-06 16:36:36 -0700498 close();
Tamas Kovacsb941b112022-02-11 19:31:40 +0800499 return;
500 }
501
502 try {
503 // Read from the socket
504 fetch = readWant_ - readBufferPos_;
505 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
506 } catch (TTransportException& te) {
507 //In Nonblocking SSLSocket some operations need to be retried again.
508 //Current approach is parsing exception message, but a better solution needs to be investigated.
509 if(!strstr(te.what(), "retry")) {
510 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
511 close();
512 }
David Reiss89a12942010-10-06 17:10:52 +0000513
Divya Thaluru808d1432017-08-06 16:36:36 -0700514 return;
515 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000516
Tamas Kovacsb941b112022-02-11 19:31:40 +0800517 if (got > 0) {
518 // Move along in the buffer
519 readBufferPos_ += got;
David Reiss89a12942010-10-06 17:10:52 +0000520
Tamas Kovacsb941b112022-02-11 19:31:40 +0800521 // Check that we did not overdo it
522 assert(readBufferPos_ <= readWant_);
Bugra Gedik8bcb7ac2018-01-21 09:43:49 -0800523
Tamas Kovacsb941b112022-02-11 19:31:40 +0800524 // We are done reading, move onto the next state
525 if (readBufferPos_ == readWant_) {
526 transition();
527 if (socketState_ == SOCKET_RECV_FRAMING && tSocket_->hasPendingDataToRead())
528 {
529 continue;
530 }
531 }
532 return;
Divya Thaluru808d1432017-08-06 16:36:36 -0700533 }
Mark Slee79b16942007-11-26 19:05:29 +0000534
Tamas Kovacsb941b112022-02-11 19:31:40 +0800535 // Whenever we get down here it means a remote disconnect
536 close();
537
David Reiss105961d2010-10-06 17:10:17 +0000538 return;
Jake Farrellb0d95602011-12-06 01:17:26 +0000539
Tamas Kovacsb941b112022-02-11 19:31:40 +0800540 case SOCKET_SEND:
541 // Should never have position past size
542 assert(writeBufferPos_ <= writeBufferSize_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000543
Tamas Kovacsb941b112022-02-11 19:31:40 +0800544 // If there is no data to send, then let us move on
545 if (writeBufferPos_ == writeBufferSize_) {
546 GlobalOutput("WARNING: Send state with no data to send");
547 transition();
548 return;
549 }
Mark Slee79b16942007-11-26 19:05:29 +0000550
Tamas Kovacsb941b112022-02-11 19:31:40 +0800551 try {
552 left = writeBufferSize_ - writeBufferPos_;
553 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
554 } catch (TTransportException& te) {
555 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
556 close();
557 return;
558 }
559
560 writeBufferPos_ += sent;
561
562 // Did we overdo it?
563 assert(writeBufferPos_ <= writeBufferSize_);
564
565 // We are done!
566 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000567 transition();
568 }
Tamas Kovacsb941b112022-02-11 19:31:40 +0800569
570 return;
571
572 default:
573 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
574 assert(0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000575 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000576 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000577 }
578}
579
Dave Watson792db4e2015-01-16 11:22:01 -0800580bool TNonblockingServer::getHeaderTransport() {
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +0100581 // Currently if there is no output protocol factory,
Dave Watson792db4e2015-01-16 11:22:01 -0800582 // we assume header transport (without having to create
583 // a new transport and check)
Sebastian Zenker042580f2019-01-29 15:48:12 +0100584 return getOutputProtocolFactory() == nullptr;
Dave Watson792db4e2015-01-16 11:22:01 -0800585}
586
Mark Slee2f6404d2006-10-10 01:37:40 +0000587/**
588 * This is called when the application transitions from one state into
589 * another. This means that it has finished writing the data that it needed
590 * to, or finished receiving the data that it needed to.
591 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000592void TNonblockingServer::TConnection::transition() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000593 // ensure this connection is active right now
594 assert(ioThread_);
595 assert(server_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000596
Mark Slee2f6404d2006-10-10 01:37:40 +0000597 // Switch upon the state that we are currently in and move to a new state
598 switch (appState_) {
599
600 case APP_READ_REQUEST:
601 // We are done reading the request, package the read buffer into transport
602 // and get back some data from the dispatch function
Dave Watson792db4e2015-01-16 11:22:01 -0800603 if (server_->getHeaderTransport()) {
604 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
605 outputTransport_->resetBuffer();
606 } else {
607 // We saved room for the framing size in case header transport needed it,
608 // but just skip it for the non-header case
609 inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
610 outputTransport_->resetBuffer();
611
612 // Prepend four bytes of blank space to the buffer so we can
613 // write the frame size there later.
614 outputTransport_->getWritePtr(4);
615 outputTransport_->wroteBytes(4);
616 }
Mark Slee79b16942007-11-26 19:05:29 +0000617
David Reiss01fe1532010-03-09 05:19:25 +0000618 server_->incrementActiveProcessors();
619
Mark Sleee02385b2007-06-09 01:21:16 +0000620 if (server_->isThreadPoolProcessing()) {
621 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000622
David Reiss01fe1532010-03-09 05:19:25 +0000623 // Create task and dispatch to the thread manager
cyy316723a2019-01-05 16:35:14 +0800624 std::shared_ptr<Runnable> task = std::shared_ptr<Runnable>(
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100625 new Task(processor_, inputProtocol_, outputProtocol_, this));
David Reiss01fe1532010-03-09 05:19:25 +0000626 // The application is now waiting on the task to finish
627 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000628
Changli Gaod4fa7062017-03-10 13:25:43 +0800629 // Set this connection idle so that libevent doesn't process more
630 // data on it while we're still waiting for the threadmanager to
631 // finish this task
632 setIdle();
633
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100634 try {
635 server_->addTask(task);
636 } catch (IllegalStateException& ise) {
637 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
638 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
Changli Gaod4fa7062017-03-10 13:25:43 +0800639 server_->decrementActiveProcessors();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100640 close();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100641 } catch (TimedOutException& to) {
642 GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
Changli Gaod4fa7062017-03-10 13:25:43 +0800643 server_->decrementActiveProcessors();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100644 close();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100645 }
Mark Slee402ee282007-08-23 01:43:20 +0000646
David Reiss01fe1532010-03-09 05:19:25 +0000647 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000648 } else {
649 try {
Roger Meier72957452013-06-29 00:28:50 +0200650 if (serverEventHandler_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100651 serverEventHandler_->processContext(connectionContext_, getTSocket());
Roger Meier72957452013-06-29 00:28:50 +0200652 }
Mark Sleee02385b2007-06-09 01:21:16 +0000653 // Invoke the processor
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100654 processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
655 } catch (const TTransportException& ttx) {
656 GlobalOutput.printf(
657 "TNonblockingServer transport error in "
658 "process(): %s",
659 ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000660 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000661 close();
662 return;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100663 } catch (const std::exception& x) {
Bryan Duxbury1e987582011-08-25 17:33:03 +0000664 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100665 typeid(x).name(),
666 x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000667 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000668 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000669 return;
670 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000671 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000672 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000673 close();
674 return;
675 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000676 }
James E. King III9bea32f2018-03-16 16:07:42 -0400677 // fallthrough
Mark Slee2f6404d2006-10-10 01:37:40 +0000678
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100679 // Intentionally fall through here, the call to process has written into
680 // the writeBuffer_
Mark Slee402ee282007-08-23 01:43:20 +0000681
Mark Sleee02385b2007-06-09 01:21:16 +0000682 case APP_WAIT_TASK:
683 // We have now finished processing a task and the result has been written
684 // into the outputTransport_, so we grab its contents and place them into
685 // the writeBuffer_ for actual writing by the libevent thread
686
David Reiss01fe1532010-03-09 05:19:25 +0000687 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000688 // Get the result of the operation
689 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
690
691 // If the function call generated return data, then move into the send
692 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000693 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000694 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000695
696 // Move into write state
697 writeBufferPos_ = 0;
698 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000699
David Reissaf787782008-07-03 20:29:34 +0000700 // Put the frame size into the write buffer
Sebastian Zenker042580f2019-01-29 15:48:12 +0100701 auto frameSize = (int32_t)htonl(writeBufferSize_ - 4);
David Reissaf787782008-07-03 20:29:34 +0000702 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000703
704 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000705 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000706 setWrite();
707
Mark Slee2f6404d2006-10-10 01:37:40 +0000708 return;
709 }
710
David Reissc51986f2009-03-24 20:01:25 +0000711 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000712 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000713 goto LABEL_APP_INIT;
714
Mark Slee2f6404d2006-10-10 01:37:40 +0000715 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000716 // it's now safe to perform buffer size housekeeping.
717 if (writeBufferSize_ > largestWriteBufferSize_) {
718 largestWriteBufferSize_ = writeBufferSize_;
719 }
720 if (server_->getResizeBufferEveryN() > 0
721 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
722 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
723 server_->getIdleWriteBufferLimit());
724 callsForResize_ = 0;
725 }
James E. King III9bea32f2018-03-16 16:07:42 -0400726 // fallthrough
Mark Slee2f6404d2006-10-10 01:37:40 +0000727
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100728 // N.B.: We also intentionally fall through here into the INIT state!
Mark Slee2f6404d2006-10-10 01:37:40 +0000729
Mark Slee92f00fb2006-10-25 01:28:17 +0000730 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000731 case APP_INIT:
732
733 // Clear write buffer variables
Sebastian Zenker042580f2019-01-29 15:48:12 +0100734 writeBuffer_ = nullptr;
Mark Slee2f6404d2006-10-10 01:37:40 +0000735 writeBufferPos_ = 0;
736 writeBufferSize_ = 0;
737
Mark Slee2f6404d2006-10-10 01:37:40 +0000738 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000739 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000740 appState_ = APP_READ_FRAME_SIZE;
741
David Reiss89a12942010-10-06 17:10:52 +0000742 readBufferPos_ = 0;
743
Mark Slee2f6404d2006-10-10 01:37:40 +0000744 // Register read event
745 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000746
Mark Slee2f6404d2006-10-10 01:37:40 +0000747 return;
748
749 case APP_READ_FRAME_SIZE:
Dave Watson792db4e2015-01-16 11:22:01 -0800750 readWant_ += 4;
751
David Reiss89a12942010-10-06 17:10:52 +0000752 // We just read the request length
753 // Double the buffer size until it is big enough
754 if (readWant_ > readBufferSize_) {
755 if (readBufferSize_ == 0) {
756 readBufferSize_ = 1;
757 }
758 uint32_t newSize = readBufferSize_;
759 while (readWant_ > newSize) {
760 newSize *= 2;
761 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000762
Sebastian Zenker042580f2019-01-29 15:48:12 +0100763 auto* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
764 if (newBuffer == nullptr) {
David Reiss89a12942010-10-06 17:10:52 +0000765 // nothing else to be done...
766 throw std::bad_alloc();
767 }
768 readBuffer_ = newBuffer;
769 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000770 }
771
Dave Watson792db4e2015-01-16 11:22:01 -0800772 readBufferPos_ = 4;
773 *((uint32_t*)readBuffer_) = htonl(readWant_ - 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000774
775 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000776 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000777 appState_ = APP_READ_REQUEST;
778
Mark Slee2f6404d2006-10-10 01:37:40 +0000779 return;
780
David Reiss01fe1532010-03-09 05:19:25 +0000781 case APP_CLOSE_CONNECTION:
782 server_->decrementActiveProcessors();
783 close();
784 return;
785
Mark Slee2f6404d2006-10-10 01:37:40 +0000786 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000787 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000788 assert(0);
789 }
790}
791
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000792void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000793 // Catch the do nothing case
794 if (eventFlags_ == eventFlags) {
795 return;
796 }
797
798 // Delete a previously existing event
Buğra Gedik36d1b0d2016-09-04 17:18:15 +0900799 if (eventFlags_ && event_del(&event_) == -1) {
800 GlobalOutput.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR);
801 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000802 }
803
804 // Update in memory structure
805 eventFlags_ = eventFlags;
806
Mark Slee402ee282007-08-23 01:43:20 +0000807 // Do not call event_set if there are no flags
808 if (!eventFlags_) {
809 return;
810 }
811
David Reiss01fe1532010-03-09 05:19:25 +0000812 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000813 * event_set:
814 *
815 * Prepares the event structure &event to be used in future calls to
816 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000817 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000818 *
819 * The events can be either EV_READ, EV_WRITE, or both, indicating
820 * that an application can read or write from the file respectively without
821 * blocking.
822 *
Mark Sleee02385b2007-06-09 01:21:16 +0000823 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000824 * the event and the type of event which will be one of: EV_TIMEOUT,
825 * EV_SIGNAL, EV_READ, EV_WRITE.
826 *
827 * The additional flag EV_PERSIST makes an event_add() persistent until
828 * event_del() has been called.
829 *
830 * Once initialized, the &event struct can be used repeatedly with
831 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000832 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000833 * when an ev structure has been added to libevent using event_add() the
834 * structure must persist until the event occurs (assuming EV_PERSIST
835 * is not set) or is removed using event_del(). You may not reuse the same
836 * ev structure for multiple monitored descriptors; each descriptor needs
837 * its own ev.
838 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100839 event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
Jake Farrellb0d95602011-12-06 01:17:26 +0000840 event_base_set(ioThread_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000841
842 // Add the event
Sebastian Zenker042580f2019-01-29 15:48:12 +0100843 if (event_add(&event_, nullptr) == -1) {
Buğra Gedik36d1b0d2016-09-04 17:18:15 +0900844 GlobalOutput.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR);
Mark Slee2f6404d2006-10-10 01:37:40 +0000845 }
846}
847
848/**
849 * Closes a connection
850 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000851void TNonblockingServer::TConnection::close() {
Changli Gaobf42d552017-03-20 14:29:07 +0800852 setIdle();
David Reiss105961d2010-10-06 17:10:17 +0000853
Roger Meier72957452013-06-29 00:28:50 +0200854 if (serverEventHandler_) {
David Reiss105961d2010-10-06 17:10:17 +0000855 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000856 }
Sebastian Zenker042580f2019-01-29 15:48:12 +0100857 ioThread_ = nullptr;
Mark Slee2f6404d2006-10-10 01:37:40 +0000858
859 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000860 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000861
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000862 // close any factory produced transports
863 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000864 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000865
Roger Meier464a3a42014-07-07 21:48:28 +0200866 // release processor and handler
867 processor_.reset();
868
Mark Slee2f6404d2006-10-10 01:37:40 +0000869 // Give this object back to the server that owns it
870 server_->returnConnection(this);
871}
872
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100873void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000874 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000875 free(readBuffer_);
Sebastian Zenker042580f2019-01-29 15:48:12 +0100876 readBuffer_ = nullptr;
David Reiss89a12942010-10-06 17:10:52 +0000877 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000878 }
David Reiss54bec5d2010-10-06 17:10:45 +0000879
880 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
881 // just start over
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400882 outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
David Reiss54bec5d2010-10-06 17:10:45 +0000883 largestWriteBufferSize_ = 0;
884 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000885}
886
David Reiss8ede8182010-09-02 15:26:28 +0000887TNonblockingServer::~TNonblockingServer() {
Roger Meier0c04fcc2013-03-22 19:52:08 +0100888 // Close any active connections (moves them to the idle connection stack)
uv7471252cf32024-05-11 11:14:25 +0800889 while (!activeConnections_.empty()) {
890 (*activeConnections_.begin())->close();
Roger Meier0c04fcc2013-03-22 19:52:08 +0100891 }
David Reiss8ede8182010-09-02 15:26:28 +0000892 // Clean up unused TConnection objects in connectionStack_
893 while (!connectionStack_.empty()) {
894 TConnection* connection = connectionStack_.top();
895 connectionStack_.pop();
896 delete connection;
897 }
Roger Meier0c04fcc2013-03-22 19:52:08 +0100898 // The TNonblockingIOThread objects have shared_ptrs to the Thread
899 // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
900 // objects (as runnable) so these objects will never deallocate without help.
901 while (!ioThreads_.empty()) {
cyy316723a2019-01-05 16:35:14 +0800902 std::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100903 ioThreads_.pop_back();
cyy316723a2019-01-05 16:35:14 +0800904 iot->setThread(std::shared_ptr<Thread>());
Roger Meier0c04fcc2013-03-22 19:52:08 +0100905 }
David Reiss8ede8182010-09-02 15:26:28 +0000906}
907
Mark Slee2f6404d2006-10-10 01:37:40 +0000908/**
909 * Creates a new connection either by reusing an object off the stack or
910 * by allocating a new one entirely
911 */
cyy316723a2019-01-05 16:35:14 +0800912TNonblockingServer::TConnection* TNonblockingServer::createConnection(std::shared_ptr<TSocket> socket) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000913 // Check the stack
Jake Farrellb0d95602011-12-06 01:17:26 +0000914 Guard g(connMutex_);
915
916 // pick an IO thread to handle this connection -- currently round robin
Jake Farrellb0d95602011-12-06 01:17:26 +0000917 assert(nextIOThread_ < ioThreads_.size());
918 int selectedThreadIdx = nextIOThread_;
Ben Craig64935232013-10-09 15:21:38 -0500919 nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size());
Jake Farrellb0d95602011-12-06 01:17:26 +0000920
921 TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
922
923 // Check the connection stack to see if we can re-use
Sebastian Zenker042580f2019-01-29 15:48:12 +0100924 TConnection* result = nullptr;
Mark Slee2f6404d2006-10-10 01:37:40 +0000925 if (connectionStack_.empty()) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700926 result = new TConnection(socket, ioThread);
Jake Farrellb0d95602011-12-06 01:17:26 +0000927 ++numTConnections_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000928 } else {
Jake Farrellb0d95602011-12-06 01:17:26 +0000929 result = connectionStack_.top();
Mark Slee2f6404d2006-10-10 01:37:40 +0000930 connectionStack_.pop();
Divya Thaluru808d1432017-08-06 16:36:36 -0700931 result->setSocket(socket);
932 result->init(ioThread);
Mark Slee2f6404d2006-10-10 01:37:40 +0000933 }
uv7471252cf32024-05-11 11:14:25 +0800934
935 activeConnections_.insert(result);
Jake Farrellb0d95602011-12-06 01:17:26 +0000936 return result;
Mark Slee2f6404d2006-10-10 01:37:40 +0000937}
938
939/**
940 * Returns a connection to the stack
941 */
942void TNonblockingServer::returnConnection(TConnection* connection) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000943 Guard g(connMutex_);
944
uv7471252cf32024-05-11 11:14:25 +0800945 activeConnections_.erase(connection);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100946 if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000947 delete connection;
Jake Farrellb0d95602011-12-06 01:17:26 +0000948 --numTConnections_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000949 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000950 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000951 connectionStack_.push(connection);
952 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000953}
954
955/**
David Reissa79e4882008-03-05 07:51:47 +0000956 * Server socket had something happen. We accept all waiting client
957 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000958 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400959void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100960 (void)which;
David Reiss3bb5e052010-01-25 19:31:31 +0000961 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000962 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000963
Mark Slee2f6404d2006-10-10 01:37:40 +0000964 // Going to accept a new client socket
cyy316723a2019-01-05 16:35:14 +0800965 std::shared_ptr<TSocket> clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000966
Divya Thaluru808d1432017-08-06 16:36:36 -0700967 clientSocket = serverTransport_->accept();
968 if (clientSocket) {
David Reiss01fe1532010-03-09 05:19:25 +0000969 // If we're overloaded, take action here
970 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000971 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000972 nConnectionsDropped_++;
973 nTotalConnectionsDropped_++;
974 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700975 clientSocket->close();
David Reiss83b8fda2010-03-09 05:19:34 +0000976 return;
David Reiss01fe1532010-03-09 05:19:25 +0000977 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
978 if (!drainPendingTask()) {
979 // Nothing left to discard, so we drop connection instead.
Divya Thaluru808d1432017-08-06 16:36:36 -0700980 clientSocket->close();
David Reiss83b8fda2010-03-09 05:19:34 +0000981 return;
David Reiss01fe1532010-03-09 05:19:25 +0000982 }
983 }
984 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000985
Mark Slee2f6404d2006-10-10 01:37:40 +0000986 // Create a new TConnection for this client socket.
Divya Thaluru808d1432017-08-06 16:36:36 -0700987 TConnection* clientConnection = createConnection(clientSocket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000988
989 // Fail fast if we could not create a TConnection object
Sebastian Zenker042580f2019-01-29 15:48:12 +0100990 if (clientConnection == nullptr) {
David Reiss01e55c12008-07-13 22:18:51 +0000991 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Divya Thaluru808d1432017-08-06 16:36:36 -0700992 clientSocket->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000993 return;
994 }
995
Jake Farrellb0d95602011-12-06 01:17:26 +0000996 /*
997 * Either notify the ioThread that is assigned this connection to
998 * start processing, or if it is us, we'll just ask this
999 * connection to do its initial state change here.
1000 *
1001 * (We need to avoid writing to our own notification pipe, to
1002 * avoid possible deadlocks if the pipe is full.)
1003 *
1004 * The IO thread #0 is the only one that handles these listen
1005 * events, so unless the connection has been assigned to thread #0
1006 * we know it's not on our thread.
1007 */
1008 if (clientConnection->getIOThreadNumber() == 0) {
1009 clientConnection->transition();
1010 } else {
Jens Geyerfb05cf62014-12-04 21:49:07 +01001011 if (!clientConnection->notifyIOThread()) {
1012 GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
Changli Gao75386db2017-03-10 13:15:37 +08001013 clientConnection->close();
Jens Geyerfb05cf62014-12-04 21:49:07 +01001014 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001015 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001016 }
1017}
1018
1019/**
Mark Slee79b16942007-11-26 19:05:29 +00001020 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +00001021 */
Jake Farrellb0d95602011-12-06 01:17:26 +00001022void TNonblockingServer::createAndListenOnSocket() {
Divya Thaluru808d1432017-08-06 16:36:36 -07001023 serverTransport_->listen();
1024 serverSocket_ = serverTransport_->getSocketFD();
Mark Slee79b16942007-11-26 19:05:29 +00001025}
1026
Mark Slee79b16942007-11-26 19:05:29 +00001027
cyy316723a2019-01-05 16:35:14 +08001028void TNonblockingServer::setThreadManager(std::shared_ptr<ThreadManager> threadManager) {
David Reiss068f4162010-03-09 05:19:45 +00001029 threadManager_ = threadManager;
Roger Meier72957452013-06-29 00:28:50 +02001030 if (threadManager) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001031 threadManager->setExpireCallback(
cyy316723a2019-01-05 16:35:14 +08001032 std::bind(&TNonblockingServer::expireClose,
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001033 this,
cyy316723a2019-01-05 16:35:14 +08001034 std::placeholders::_1));
David Reiss068f4162010-03-09 05:19:45 +00001035 threadPoolProcessing_ = true;
1036 } else {
1037 threadPoolProcessing_ = false;
1038 }
1039}
1040
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001041bool TNonblockingServer::serverOverloaded() {
David Reiss01fe1532010-03-09 05:19:25 +00001042 size_t activeConnections = numTConnections_ - connectionStack_.size();
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001043 if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) {
David Reiss01fe1532010-03-09 05:19:25 +00001044 if (!overloaded_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001045 GlobalOutput.printf("TNonblockingServer: overload condition begun.");
David Reiss01fe1532010-03-09 05:19:25 +00001046 overloaded_ = true;
1047 }
1048 } else {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001049 if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_)
1050 && (activeConnections <= overloadHysteresis_ * maxConnections_)) {
1051 GlobalOutput.printf(
1052 "TNonblockingServer: overload ended; "
1053 "%u dropped (%llu total)",
1054 nConnectionsDropped_,
1055 nTotalConnectionsDropped_);
David Reiss01fe1532010-03-09 05:19:25 +00001056 nConnectionsDropped_ = 0;
1057 overloaded_ = false;
1058 }
1059 }
1060
1061 return overloaded_;
1062}
1063
1064bool TNonblockingServer::drainPendingTask() {
1065 if (threadManager_) {
cyy316723a2019-01-05 16:35:14 +08001066 std::shared_ptr<Runnable> task = threadManager_->removeNextPending();
David Reiss01fe1532010-03-09 05:19:25 +00001067 if (task) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001068 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1069 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
David Reiss01fe1532010-03-09 05:19:25 +00001070 connection->forceClose();
1071 return true;
1072 }
1073 }
1074 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001075}
1076
cyy316723a2019-01-05 16:35:14 +08001077void TNonblockingServer::expireClose(std::shared_ptr<Runnable> task) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001078 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1079 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
David Reiss068f4162010-03-09 05:19:45 +00001080 connection->forceClose();
1081}
1082
Bugra Gedik8bcb7ac2018-01-21 09:43:49 -08001083void TNonblockingServer::stop() {
Jake Farrellb0d95602011-12-06 01:17:26 +00001084 // Breaks the event loop in all threads so that they end ASAP.
cyy64750162019-02-08 13:40:59 +08001085 for (auto & ioThread : ioThreads_) {
1086 ioThread->stop();
Jake Farrellb0d95602011-12-06 01:17:26 +00001087 }
1088}
1089
Roger Meier6f2a5032013-07-08 23:35:25 +02001090void TNonblockingServer::registerEvents(event_base* user_event_base) {
1091 userEventBase_ = user_event_base;
1092
Jake Farrellb0d95602011-12-06 01:17:26 +00001093 // init listen socket
Roger Meiere802aa42013-07-19 21:10:54 +02001094 if (serverSocket_ == THRIFT_INVALID_SOCKET)
Roger Meier6f2a5032013-07-08 23:35:25 +02001095 createAndListenOnSocket();
Mark Slee79b16942007-11-26 19:05:29 +00001096
Jake Farrellb0d95602011-12-06 01:17:26 +00001097 // set up the IO threads
1098 assert(ioThreads_.empty());
1099 if (!numIOThreads_) {
1100 numIOThreads_ = DEFAULT_IO_THREADS;
David Reiss01fe1532010-03-09 05:19:25 +00001101 }
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +09001102 // User-provided event-base doesn't works for multi-threaded servers
1103 assert(numIOThreads_ == 1 || !userEventBase_);
David Reiss01fe1532010-03-09 05:19:25 +00001104
Roger Meierd0cdecf2011-12-08 19:34:01 +00001105 for (uint32_t id = 0; id < numIOThreads_; ++id) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001106 // the first IO thread also does the listening on server socket
Roger Meier0be9ffa2013-07-19 21:10:01 +02001107 THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
Mark Slee2f6404d2006-10-10 01:37:40 +00001108
Jake Farrellb0d95602011-12-06 01:17:26 +00001109 shared_ptr<TNonblockingIOThread> thread(
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001110 new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
Jake Farrellb0d95602011-12-06 01:17:26 +00001111 ioThreads_.push_back(thread);
1112 }
1113
1114 // Notify handler of the preServe event
Roger Meier72957452013-06-29 00:28:50 +02001115 if (eventHandler_) {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001116 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001117 }
1118
Jake Farrellb0d95602011-12-06 01:17:26 +00001119 // Start all of our helper IO threads. Note that the threads run forever,
1120 // only terminating if stop() is called.
1121 assert(ioThreads_.size() == numIOThreads_);
1122 assert(ioThreads_.size() > 0);
1123
Divya Thaluru808d1432017-08-06 16:36:36 -07001124 GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.",
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001125 ioThreads_.size());
Jake Farrellb0d95602011-12-06 01:17:26 +00001126
1127 // Launch all the secondary IO threads in separate threads
1128 if (ioThreads_.size() > 1) {
cyyca8af9b2019-01-11 22:13:12 +08001129 ioThreadFactory_.reset(new ThreadFactory(
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001130 false // detached
1131 ));
Jake Farrellb0d95602011-12-06 01:17:26 +00001132
1133 assert(ioThreadFactory_.get());
1134
1135 // intentionally starting at thread 1, not 0
Roger Meierd0cdecf2011-12-08 19:34:01 +00001136 for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001137 shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1138 ioThreads_[i]->setThread(thread);
1139 thread->start();
1140 }
1141 }
1142
Roger Meier6f2a5032013-07-08 23:35:25 +02001143 // Register the events for the primary (listener) IO thread
1144 ioThreads_[0]->registerEvents();
1145}
1146
1147/**
1148 * Main workhorse function, starts up the server listening on a port and
1149 * loops over the libevent handler.
1150 */
1151void TNonblockingServer::serve() {
1152
Konrad Grochowski1f6e3802015-05-18 18:10:06 +02001153 if (ioThreads_.empty())
Sebastian Zenker042580f2019-01-29 15:48:12 +01001154 registerEvents(nullptr);
Roger Meier6f2a5032013-07-08 23:35:25 +02001155
Jake Farrellb0d95602011-12-06 01:17:26 +00001156 // Run the primary (listener) IO thread loop in our main thread; this will
1157 // only return when the server is shutting down.
1158 ioThreads_[0]->run();
1159
1160 // Ensure all threads are finished before exiting serve()
Roger Meierd0cdecf2011-12-08 19:34:01 +00001161 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001162 ioThreads_[i]->join();
1163 GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1164 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001165}
1166
Jake Farrellb0d95602011-12-06 01:17:26 +00001167TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1168 int number,
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001169 THRIFT_SOCKET listenSocket,
Jake Farrellb0d95602011-12-06 01:17:26 +00001170 bool useHighPriority)
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001171 : server_(server),
1172 number_(number),
cyy64750162019-02-08 13:40:59 +08001173 threadId_{},
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001174 listenSocket_(listenSocket),
1175 useHighPriority_(useHighPriority),
Sebastian Zenker042580f2019-01-29 15:48:12 +01001176 eventBase_(nullptr),
cyy64750162019-02-08 13:40:59 +08001177 ownEventBase_(false),
1178 serverEvent_{},
1179 notificationEvent_{} {
Jake Farrellb0d95602011-12-06 01:17:26 +00001180 notificationPipeFDs_[0] = -1;
1181 notificationPipeFDs_[1] = -1;
1182}
1183
1184TNonblockingIOThread::~TNonblockingIOThread() {
1185 // make sure our associated thread is fully finished
1186 join();
1187
Roger Meier6f2a5032013-07-08 23:35:25 +02001188 if (eventBase_ && ownEventBase_) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001189 event_base_free(eventBase_);
Roger Meier6f2a5032013-07-08 23:35:25 +02001190 ownEventBase_ = false;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001191 }
1192
gzshi41945622017-01-06 10:47:03 +08001193 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001194 if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001195 GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001196 }
Roger Meier0be9ffa2013-07-19 21:10:01 +02001197 listenSocket_ = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +00001198 }
1199
cyy64750162019-02-08 13:40:59 +08001200 for (auto notificationPipeFD : notificationPipeFDs_) {
1201 if (notificationPipeFD >= 0) {
1202 if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFD)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001203 GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001204 THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001205 }
cyy64750162019-02-08 13:40:59 +08001206 notificationPipeFD = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +00001207 }
1208 }
1209}
1210
1211void TNonblockingIOThread::createNotificationPipe() {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001212 if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
Roger Meier12d70532011-12-14 23:35:28 +00001213 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
Jake Farrellb0d95602011-12-06 01:17:26 +00001214 throw TException("can't create notification pipe");
1215 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001216 if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
1217 || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001218 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1219 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
1220 throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
Jake Farrellb0d95602011-12-06 01:17:26 +00001221 }
cyy64750162019-02-08 13:40:59 +08001222 for (auto notificationPipeFD : notificationPipeFDs_) {
Roger Meier12d70532011-12-14 23:35:28 +00001223#if LIBEVENT_VERSION_NUMBER < 0x02000000
1224 int flags;
cyy64750162019-02-08 13:40:59 +08001225 if ((flags = THRIFT_FCNTL(notificationPipeFD, F_GETFD, 0)) < 0
1226 || THRIFT_FCNTL(notificationPipeFD, F_SETFD, flags | FD_CLOEXEC) < 0) {
Roger Meier12d70532011-12-14 23:35:28 +00001227#else
cyy64750162019-02-08 13:40:59 +08001228 if (evutil_make_socket_closeonexec(notificationPipeFD) < 0) {
Roger Meier12d70532011-12-14 23:35:28 +00001229#endif
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001230 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1231 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001232 throw TException(
1233 "TNonblockingServer::createNotificationPipe() "
1234 "FD_CLOEXEC");
Jake Farrellb0d95602011-12-06 01:17:26 +00001235 }
1236 }
1237}
1238
1239/**
1240 * Register the core libevent events onto the proper base.
1241 */
1242void TNonblockingIOThread::registerEvents() {
Roger Meier6f2a5032013-07-08 23:35:25 +02001243 threadId_ = Thread::get_current();
1244
Sebastian Zenker042580f2019-01-29 15:48:12 +01001245 assert(eventBase_ == nullptr);
Roger Meier6f2a5032013-07-08 23:35:25 +02001246 eventBase_ = getServer()->getUserEventBase();
Sebastian Zenker042580f2019-01-29 15:48:12 +01001247 if (eventBase_ == nullptr) {
Roger Meier6f2a5032013-07-08 23:35:25 +02001248 eventBase_ = event_base_new();
1249 ownEventBase_ = true;
1250 }
1251
1252 // Print some libevent stats
1253 if (number_ == 0) {
1254 GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001255 event_get_version(),
1256 event_base_get_method(eventBase_));
Roger Meier6f2a5032013-07-08 23:35:25 +02001257 }
1258
gzshi41945622017-01-06 10:47:03 +08001259 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001260 // Register the server event
1261 event_set(&serverEvent_,
1262 listenSocket_,
1263 EV_READ | EV_PERSIST,
1264 TNonblockingIOThread::listenHandler,
1265 server_);
1266 event_base_set(eventBase_, &serverEvent_);
1267
1268 // Add the event and start up the server
Sebastian Zenker042580f2019-01-29 15:48:12 +01001269 if (-1 == event_add(&serverEvent_, nullptr)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001270 throw TException(
1271 "TNonblockingServer::serve(): "
1272 "event_add() failed on server listen event");
Jake Farrellb0d95602011-12-06 01:17:26 +00001273 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001274 GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001275 }
1276
1277 createNotificationPipe();
1278
1279 // Create an event to be notified when a task finishes
1280 event_set(&notificationEvent_,
1281 getNotificationRecvFD(),
1282 EV_READ | EV_PERSIST,
1283 TNonblockingIOThread::notifyHandler,
1284 this);
1285
1286 // Attach to the base
1287 event_base_set(eventBase_, &notificationEvent_);
1288
1289 // Add the event and start up the server
Sebastian Zenker042580f2019-01-29 15:48:12 +01001290 if (-1 == event_add(&notificationEvent_, nullptr)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001291 throw TException(
1292 "TNonblockingServer::serve(): "
1293 "event_add() failed on task-done notification event");
Jake Farrellb0d95602011-12-06 01:17:26 +00001294 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001295 GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001296}
1297
1298bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
cyy9fed9012019-01-16 14:43:51 +08001299 auto fd = getNotificationSendFD();
Jake Farrellb0d95602011-12-06 01:17:26 +00001300 if (fd < 0) {
1301 return false;
1302 }
1303
st0ke961fa702018-10-12 18:37:40 +07001304 int ret = -1;
tpcwangf98d59f2016-03-23 16:18:52 -07001305 long kSize = sizeof(conn);
st0ke961fa702018-10-12 18:37:40 +07001306 const char * pos = (const char *)const_cast_sockopt(&conn);
1307
1308#if defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H)
1309 struct pollfd pfd = {fd, POLLOUT, 0};
1310
1311 while (kSize > 0) {
1312 pfd.revents = 0;
1313 ret = poll(&pfd, 1, -1);
1314 if (ret < 0) {
1315 return false;
1316 } else if (ret == 0) {
1317 continue;
1318 }
1319
1320 if (pfd.revents & POLLHUP || pfd.revents & POLLERR) {
1321 ::THRIFT_CLOSESOCKET(fd);
1322 return false;
1323 }
1324
1325 if (pfd.revents & POLLOUT) {
1326 ret = send(fd, pos, kSize, 0);
1327 if (ret < 0) {
1328 if (errno == EAGAIN) {
1329 continue;
1330 }
1331
1332 ::THRIFT_CLOSESOCKET(fd);
1333 return false;
1334 }
1335
1336 kSize -= ret;
1337 pos += ret;
1338 }
1339 }
1340#else
1341 fd_set wfds, efds;
abadcafe38772c92015-04-03 22:23:04 +08001342
1343 while (kSize > 0) {
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001344 FD_ZERO(&wfds);
1345 FD_ZERO(&efds);
1346 FD_SET(fd, &wfds);
1347 FD_SET(fd, &efds);
zeshuai00726681fb2020-06-03 17:24:38 +08001348 ret = select(static_cast<int>(fd + 1), nullptr, &wfds, &efds, nullptr);
abadcafe38772c92015-04-03 22:23:04 +08001349 if (ret < 0) {
1350 return false;
1351 } else if (ret == 0) {
1352 continue;
1353 }
1354
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001355 if (FD_ISSET(fd, &efds)) {
1356 ::THRIFT_CLOSESOCKET(fd);
abadcafe38772c92015-04-03 22:23:04 +08001357 return false;
1358 }
1359
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001360 if (FD_ISSET(fd, &wfds)) {
abadcafe38772c92015-04-03 22:23:04 +08001361 ret = send(fd, pos, kSize, 0);
1362 if (ret < 0) {
1363 if (errno == EAGAIN) {
1364 continue;
1365 }
1366
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001367 ::THRIFT_CLOSESOCKET(fd);
abadcafe38772c92015-04-03 22:23:04 +08001368 return false;
1369 }
1370
1371 kSize -= ret;
1372 pos += ret;
1373 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001374 }
st0ke961fa702018-10-12 18:37:40 +07001375#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001376
1377 return true;
1378}
1379
1380/* static */
Roger Meier12d70532011-12-14 23:35:28 +00001381void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
Sebastian Zenker042580f2019-01-29 15:48:12 +01001382 auto* ioThread = (TNonblockingIOThread*)v;
Jake Farrellb0d95602011-12-06 01:17:26 +00001383 assert(ioThread);
Roger Meierd0cdecf2011-12-08 19:34:01 +00001384 (void)which;
Jake Farrellb0d95602011-12-06 01:17:26 +00001385
1386 while (true) {
Sebastian Zenker042580f2019-01-29 15:48:12 +01001387 TNonblockingServer::TConnection* connection = nullptr;
Jake Farrellb0d95602011-12-06 01:17:26 +00001388 const int kSize = sizeof(connection);
Ben Craig64935232013-10-09 15:21:38 -05001389 long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
Jake Farrellb0d95602011-12-06 01:17:26 +00001390 if (nBytes == kSize) {
Sebastian Zenker042580f2019-01-29 15:48:12 +01001391 if (connection == nullptr) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001392 // this is the command to stop our thread, exit the handler!
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001393 ioThread->breakLoop(false);
Jake Farrellb0d95602011-12-06 01:17:26 +00001394 return;
1395 }
1396 connection->transition();
1397 } else if (nBytes > 0) {
1398 // throw away these bytes and hope that next time we get a solid read
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001399 GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
Jake Farrellb0d95602011-12-06 01:17:26 +00001400 ioThread->breakLoop(true);
1401 return;
1402 } else if (nBytes == 0) {
1403 GlobalOutput.printf("notifyHandler: Notify socket closed!");
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001404 ioThread->breakLoop(false);
Jake Farrellb0d95602011-12-06 01:17:26 +00001405 // exit the loop
1406 break;
1407 } else { // nBytes < 0
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001408 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
1409 && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
1410 GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
1411 ioThread->breakLoop(true);
1412 return;
Jake Farrellb0d95602011-12-06 01:17:26 +00001413 }
1414 // exit the loop
1415 break;
1416 }
1417 }
1418}
1419
1420void TNonblockingIOThread::breakLoop(bool error) {
1421 if (error) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001422 GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001423 // TODO: figure out something better to do here, but for now kill the
1424 // whole process.
1425 GlobalOutput.printf("TNonblockingServer: aborting process.");
1426 ::abort();
1427 }
1428
Jake Farrellb0d95602011-12-06 01:17:26 +00001429 // If we're running in the same thread, we can't use the notify(0)
1430 // mechanism to stop the thread, but happily if we're running in the
1431 // same thread, this means the thread can't be blocking in the event
1432 // loop either.
Roger Meier12d70532011-12-14 23:35:28 +00001433 if (!Thread::is_current(threadId_)) {
Sebastian Zenker042580f2019-01-29 15:48:12 +01001434 notify(nullptr);
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001435 } else {
1436 // cause the loop to stop ASAP - even if it has things to do in it
1437 event_base_loopbreak(eventBase_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001438 }
1439}
1440
1441void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
Roger Meier12d70532011-12-14 23:35:28 +00001442#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +00001443 // Start out with a standard, low-priority setup for the sched params.
1444 struct sched_param sp;
Mario Emmenlaueree652152021-08-29 19:05:53 +02001445 memset(static_cast<void*>(&sp), 0, sizeof(sp));
Jake Farrellb0d95602011-12-06 01:17:26 +00001446 int policy = SCHED_OTHER;
1447
1448 // If desired, set up high-priority sched params structure.
1449 if (value) {
1450 // FIFO scheduler, ranked above default SCHED_OTHER queue
1451 policy = SCHED_FIFO;
1452 // The priority only compares us to other SCHED_FIFO threads, so we
1453 // just pick a random priority halfway between min & max.
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001454 const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;
Jake Farrellb0d95602011-12-06 01:17:26 +00001455
1456 sp.sched_priority = priority;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001457 }
1458
Jake Farrellb0d95602011-12-06 01:17:26 +00001459 // Actually set the sched params for the current thread.
1460 if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001461 GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001462 } else {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001463 GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001464 }
Roger Meierd051ca02013-08-15 01:35:11 +02001465#else
1466 THRIFT_UNUSED_VARIABLE(value);
Roger Meier12d70532011-12-14 23:35:28 +00001467#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001468}
Bryan Duxbury76c43682011-08-24 21:26:48 +00001469
Jake Farrellb0d95602011-12-06 01:17:26 +00001470void TNonblockingIOThread::run() {
Sebastian Zenker042580f2019-01-29 15:48:12 +01001471 if (eventBase_ == nullptr) {
Roger Meier6f2a5032013-07-08 23:35:25 +02001472 registerEvents();
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001473 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001474 if (useHighPriority_) {
1475 setCurrentThreadHighPriority(true);
1476 }
1477
Sebastian Zenker042580f2019-01-29 15:48:12 +01001478 if (eventBase_ != nullptr)
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001479 {
1480 GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
1481 // Run libevent engine, never returns, invokes calls to eventHandler
1482 event_base_loop(eventBase_, 0);
Jake Farrellb0d95602011-12-06 01:17:26 +00001483
Buğra Gedik36d1b0d2016-09-04 17:18:15 +09001484 if (useHighPriority_) {
1485 setCurrentThreadHighPriority(false);
1486 }
1487
1488 // cleans up our registered events
1489 cleanupEvents();
Jake Farrellb0d95602011-12-06 01:17:26 +00001490 }
1491
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001492 GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001493}
1494
1495void TNonblockingIOThread::cleanupEvents() {
1496 // stop the listen socket, if any
gzshi41945622017-01-06 10:47:03 +08001497 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001498 if (event_del(&serverEvent_) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001499 GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001500 }
1501 }
1502
1503 event_del(&notificationEvent_);
1504}
1505
Jake Farrellb0d95602011-12-06 01:17:26 +00001506void TNonblockingIOThread::stop() {
1507 // This should cause the thread to fall out of its event loop ASAP.
1508 breakLoop(false);
1509}
1510
1511void TNonblockingIOThread::join() {
1512 // If this was a thread created by a factory (not the thread that called
1513 // serve()), we join() it to make sure we shut down fully.
1514 if (thread_) {
1515 try {
1516 // Note that it is safe to both join() ourselves twice, as well as join
1517 // the current thread as the pthread implementation checks for deadlock.
1518 thread_->join();
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001519 } catch (...) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001520 // swallow everything
1521 }
1522 }
Bryan Duxbury76c43682011-08-24 21:26:48 +00001523}
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001524}
1525}
1526} // apache::thrift::server