blob: 8590bff72a40ceeadc71bcc2775584b8aee2faae [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meier3781c242011-12-11 20:07:21 +000020#define __STDC_FORMAT_MACROS
21
Konrad Grochowski9be4e682013-06-22 22:03:31 +020022#include <thrift/thrift-config.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000023
Roger Meier4285ba22013-06-10 21:17:23 +020024#include <thrift/server/TNonblockingServer.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000025#include <thrift/concurrency/Exception.h>
26#include <thrift/transport/TSocket.h>
27#include <thrift/concurrency/PlatformThreadFactory.h>
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040028#include <thrift/transport/PlatformSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000029
Mark Sleee02385b2007-06-09 01:21:16 +000030#include <iostream>
Lei Feiweib5ebcd12015-04-04 22:12:07 +080031
32#ifdef HAVE_SYS_SELECT_H
33#include <sys/select.h>
34#endif
Roger Meier30aae0c2011-07-08 12:23:31 +000035
36#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000037#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000038#endif
39
40#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000041#include <netinet/in.h>
42#include <netinet/tcp.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000043#endif
44
45#ifdef HAVE_ARPA_INET_H
Bryan Duxbury76c43682011-08-24 21:26:48 +000046#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000047#endif
48
49#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000050#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000051#endif
52
Roger Meier2fa9c312011-09-05 19:15:53 +000053#ifdef HAVE_FCNTL_H
Mark Slee2f6404d2006-10-10 01:37:40 +000054#include <fcntl.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000055#endif
56
Mark Slee2f6404d2006-10-10 01:37:40 +000057#include <assert.h>
Roger Meier12d70532011-12-14 23:35:28 +000058
59#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +000060#include <sched.h>
Roger Meier12d70532011-12-14 23:35:28 +000061#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000062
David Reiss9b903442009-10-21 05:51:28 +000063#ifndef AF_LOCAL
64#define AF_LOCAL AF_UNIX
65#endif
66
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040067#if !defined(PRIu32)
Roger Meier12d70532011-12-14 23:35:28 +000068#define PRIu32 "I32u"
Roger Meierf2b094f2013-06-04 22:09:37 +020069#define PRIu64 "I64u"
Roger Meier12d70532011-12-14 23:35:28 +000070#endif
71
Konrad Grochowski16a23a62014-11-13 15:33:38 +010072namespace apache {
73namespace thrift {
74namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000075
T Jake Lucianib5e62212009-01-31 22:36:20 +000076using namespace apache::thrift::protocol;
77using namespace apache::thrift::transport;
78using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000079using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000080using apache::thrift::transport::TSocket;
81using apache::thrift::transport::TTransportException;
Jake Farrellb0d95602011-12-06 01:17:26 +000082using boost::shared_ptr;
Mark Sleee02385b2007-06-09 01:21:16 +000083
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000084/// Three states for sockets: recv frame size, recv data, and send mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +010085enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000086
87/**
88 * Five states for the nonblocking server:
89 * 1) initialize
90 * 2) read 4 byte frame size
91 * 3) read frame of data
92 * 4) send back data (if any)
93 * 5) force immediate connection close
94 */
95enum TAppState {
96 APP_INIT,
97 APP_READ_FRAME_SIZE,
98 APP_READ_REQUEST,
99 APP_WAIT_TASK,
100 APP_SEND_RESULT,
101 APP_CLOSE_CONNECTION
102};
103
104/**
105 * Represents a connection that is handled via libevent. This connection
106 * essentially encapsulates a socket that has some associated libevent state.
107 */
108class TNonblockingServer::TConnection {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100109private:
Jake Farrellb0d95602011-12-06 01:17:26 +0000110 /// Server IO Thread handling this connection
111 TNonblockingIOThread* ioThread_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000112
113 /// Server handle
114 TNonblockingServer* server_;
115
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000116 /// TProcessor
117 boost::shared_ptr<TProcessor> processor_;
118
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000119 /// Object wrapping network socket
120 boost::shared_ptr<TSocket> tSocket_;
121
122 /// Libevent object
123 struct event event_;
124
125 /// Libevent flags
126 short eventFlags_;
127
128 /// Socket mode
129 TSocketState socketState_;
130
131 /// Application state
132 TAppState appState_;
133
134 /// How much data needed to read
135 uint32_t readWant_;
136
137 /// Where in the read buffer are we
138 uint32_t readBufferPos_;
139
140 /// Read buffer
141 uint8_t* readBuffer_;
142
143 /// Read buffer size
144 uint32_t readBufferSize_;
145
146 /// Write buffer
147 uint8_t* writeBuffer_;
148
149 /// Write buffer size
150 uint32_t writeBufferSize_;
151
152 /// How far through writing are we?
153 uint32_t writeBufferPos_;
154
155 /// Largest size of write buffer seen since buffer was constructed
156 size_t largestWriteBufferSize_;
157
158 /// Count of the number of calls for use with getResizeBufferEveryN().
159 int32_t callsForResize_;
160
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000161 /// Transport to read from
162 boost::shared_ptr<TMemoryBuffer> inputTransport_;
163
164 /// Transport that processor writes to
165 boost::shared_ptr<TMemoryBuffer> outputTransport_;
166
167 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
168 boost::shared_ptr<TTransport> factoryInputTransport_;
169 boost::shared_ptr<TTransport> factoryOutputTransport_;
170
171 /// Protocol decoder
172 boost::shared_ptr<TProtocol> inputProtocol_;
173
174 /// Protocol encoder
175 boost::shared_ptr<TProtocol> outputProtocol_;
176
177 /// Server event handler, if any
178 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
179
180 /// Thrift call context, if any
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100181 void* connectionContext_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000182
183 /// Go into read mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100184 void setRead() { setFlags(EV_READ | EV_PERSIST); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000185
186 /// Go into write mode
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100187 void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000188
189 /// Set socket idle
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100190 void setIdle() { setFlags(0); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000191
192 /**
193 * Set event flags for this connection.
194 *
195 * @param eventFlags flags we pass to libevent for the connection.
196 */
197 void setFlags(short eventFlags);
198
199 /**
200 * Libevent handler called (via our static wrapper) when the connection
201 * socket had something happen. Rather than use the flags libevent passed,
202 * we use the connection state to determine whether we need to read or
203 * write the socket.
204 */
205 void workSocket();
206
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100207public:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000208 class Task;
209
210 /// Constructor
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100211 TConnection(THRIFT_SOCKET socket,
212 TNonblockingIOThread* ioThread,
213 const sockaddr* addr,
214 socklen_t addrLen) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000215 readBuffer_ = NULL;
216 readBufferSize_ = 0;
217
Jake Farrellb0d95602011-12-06 01:17:26 +0000218 ioThread_ = ioThread;
219 server_ = ioThread->getServer();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000220
Jake Farrellb0d95602011-12-06 01:17:26 +0000221 // Allocate input and output transports these only need to be allocated
222 // once per TConnection (they don't need to be reallocated on init() call)
223 inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400224 outputTransport_.reset(
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100225 new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
Jake Farrellb0d95602011-12-06 01:17:26 +0000226 tSocket_.reset(new TSocket());
227 init(socket, ioThread, addr, addrLen);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000228 }
229
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100230 ~TConnection() { std::free(readBuffer_); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000231
Roger Meier0c04fcc2013-03-22 19:52:08 +0100232 /// Close this connection and free or reset its resources.
233 void close();
234
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100235 /**
236 * Check buffers against any size limits and shrink it if exceeded.
237 *
238 * @param readLimit we reduce read buffer size to this (if nonzero).
239 * @param writeLimit if nonzero and write buffer is larger, replace it.
240 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000241 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
242
243 /// Initialize
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100244 void init(THRIFT_SOCKET socket,
245 TNonblockingIOThread* ioThread,
246 const sockaddr* addr,
247 socklen_t addrLen);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000248
249 /**
250 * This is called when the application transitions from one state into
251 * another. This means that it has finished writing the data that it needed
252 * to, or finished receiving the data that it needed to.
253 */
254 void transition();
255
256 /**
257 * C-callable event handler for connection events. Provides a callback
258 * that libevent can understand which invokes connection_->workSocket().
259 *
260 * @param fd the descriptor the event occurred on.
261 * @param which the flags associated with the event.
262 * @param v void* callback arg where we placed TConnection's "this".
263 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000264 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Konrad Grochowskib7af66e2014-07-08 19:22:44 +0200265 assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000266 ((TConnection*)v)->workSocket();
267 }
268
269 /**
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000270 * Notification to server that processing has ended on this request.
271 * Can be called either when processing is completed or when a waiting
272 * task has been preemptively terminated (on overload).
273 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000274 * Don't call this from the IO thread itself.
275 *
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400276 * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000277 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100278 bool notifyIOThread() { return ioThread_->notify(this); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000279
Jake Farrellb0d95602011-12-06 01:17:26 +0000280 /*
281 * Returns the number of this connection's currently assigned IO
282 * thread.
283 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100284 int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000285
286 /// Force connection shutdown for this connection.
287 void forceClose() {
288 appState_ = APP_CLOSE_CONNECTION;
Jake Farrellb0d95602011-12-06 01:17:26 +0000289 if (!notifyIOThread()) {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100290 close();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000291 throw TException("TConnection::forceClose: failed write on notify pipe");
292 }
293 }
294
295 /// return the server this connection was initialized for.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100296 TNonblockingServer* getServer() const { return server_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000297
298 /// get state of connection.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100299 TAppState getState() const { return appState_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000300
301 /// return the TSocket transport wrapping this network connection
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100302 boost::shared_ptr<TSocket> getTSocket() const { return tSocket_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000303
304 /// return the server event handler if any
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100305 boost::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000306
307 /// return the Thrift connection context if any
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100308 void* getConnectionContext() { return connectionContext_; }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000309};
310
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100311class TNonblockingServer::TConnection::Task : public Runnable {
312public:
Mark Sleee02385b2007-06-09 01:21:16 +0000313 Task(boost::shared_ptr<TProcessor> processor,
314 boost::shared_ptr<TProtocol> input,
315 boost::shared_ptr<TProtocol> output,
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100316 TConnection* connection)
317 : processor_(processor),
318 input_(input),
319 output_(output),
320 connection_(connection),
321 serverEventHandler_(connection_->getServerEventHandler()),
322 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000323
324 void run() {
325 try {
David Reiss105961d2010-10-06 17:10:17 +0000326 for (;;) {
Roger Meier72957452013-06-29 00:28:50 +0200327 if (serverEventHandler_) {
David Reiss105961d2010-10-06 17:10:17 +0000328 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
329 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100330 if (!processor_->process(input_, output_, connectionContext_)
331 || !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000332 break;
333 }
334 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000335 } catch (const TTransportException& ttx) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000336 GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
Bryan Duxbury1e987582011-08-25 17:33:03 +0000337 } catch (const bad_alloc&) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000338 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
Henrique Mendonca962b3532012-09-20 13:19:55 +0000339 exit(1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000340 } catch (const std::exception& x) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000341 GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100342 typeid(x).name(),
343 x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000344 } catch (...) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100345 GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
Mark Sleee02385b2007-06-09 01:21:16 +0000346 }
Mark Slee79b16942007-11-26 19:05:29 +0000347
David Reiss01fe1532010-03-09 05:19:25 +0000348 // Signal completion back to the libevent thread via a pipe
Jake Farrellb0d95602011-12-06 01:17:26 +0000349 if (!connection_->notifyIOThread()) {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100350 GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
351 connection_->close();
David Reiss01fe1532010-03-09 05:19:25 +0000352 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000353 }
David Reiss01fe1532010-03-09 05:19:25 +0000354 }
355
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100356 TConnection* getTConnection() { return connection_; }
Mark Sleee02385b2007-06-09 01:21:16 +0000357
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100358private:
Mark Sleee02385b2007-06-09 01:21:16 +0000359 boost::shared_ptr<TProcessor> processor_;
360 boost::shared_ptr<TProtocol> input_;
361 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000362 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000363 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
364 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000365};
Mark Slee5ea15f92007-03-05 22:55:59 +0000366
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400367void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
Jake Farrellb0d95602011-12-06 01:17:26 +0000368 TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000369 const sockaddr* addr,
370 socklen_t addrLen) {
David Reiss105961d2010-10-06 17:10:17 +0000371 tSocket_->setSocketFD(socket);
372 tSocket_->setCachedAddress(addr, addrLen);
373
Jake Farrellb0d95602011-12-06 01:17:26 +0000374 ioThread_ = ioThread;
375 server_ = ioThread->getServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000376 appState_ = APP_INIT;
377 eventFlags_ = 0;
378
379 readBufferPos_ = 0;
380 readWant_ = 0;
381
382 writeBuffer_ = NULL;
383 writeBufferSize_ = 0;
384 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000385 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000386
David Reiss89a12942010-10-06 17:10:52 +0000387 socketState_ = SOCKET_RECV_FRAMING;
David Reiss54bec5d2010-10-06 17:10:45 +0000388 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000389
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000390 // get input/transports
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100391 factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
392 factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000393
394 // Create protocol
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100395 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
396 outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000397
398 // Set up for any server event handler
399 serverEventHandler_ = server_->getEventHandler();
Roger Meier72957452013-06-29 00:28:50 +0200400 if (serverEventHandler_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100401 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
David Reiss105961d2010-10-06 17:10:17 +0000402 } else {
403 connectionContext_ = NULL;
404 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000405
406 // Get the processor
Jake Farrellb0d95602011-12-06 01:17:26 +0000407 processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000408}
409
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000410void TNonblockingServer::TConnection::workSocket() {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100411 int got = 0, left = 0, sent = 0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000412 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000413
414 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000415 case SOCKET_RECV_FRAMING:
416 union {
417 uint8_t buf[sizeof(uint32_t)];
Roger Meier3781c242011-12-11 20:07:21 +0000418 uint32_t size;
David Reiss89a12942010-10-06 17:10:52 +0000419 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000420
David Reiss89a12942010-10-06 17:10:52 +0000421 // if we've already received some bytes we kept them here
422 framing.size = readWant_;
423 // determine size of this frame
424 try {
425 // Read from the socket
426 fetch = tSocket_->read(&framing.buf[readBufferPos_],
427 uint32_t(sizeof(framing.size) - readBufferPos_));
428 if (fetch == 0) {
429 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000430 close();
431 return;
432 }
David Reiss89a12942010-10-06 17:10:52 +0000433 readBufferPos_ += fetch;
434 } catch (TTransportException& te) {
435 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
436 close();
437
438 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000439 }
440
David Reiss89a12942010-10-06 17:10:52 +0000441 if (readBufferPos_ < sizeof(framing.size)) {
442 // more needed before frame size is known -- save what we have so far
443 readWant_ = framing.size;
444 return;
445 }
446
447 readWant_ = ntohl(framing.size);
Roger Meier3781c242011-12-11 20:07:21 +0000448 if (readWant_ > server_->getMaxFrameSize()) {
449 // Don't allow giant frame sizes. This prevents bad clients from
450 // causing us to try and allocate a giant buffer.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100451 GlobalOutput.printf(
452 "TNonblockingServer: frame size too large "
453 "(%" PRIu32 " > %" PRIu64
454 ") from client %s. "
455 "Remote side not using TFramedTransport?",
456 readWant_,
457 (uint64_t)server_->getMaxFrameSize(),
458 tSocket_->getSocketInfo().c_str());
David Reiss89a12942010-10-06 17:10:52 +0000459 close();
460 return;
461 }
462 // size known; now get the rest of the frame
463 transition();
464 return;
465
466 case SOCKET_RECV:
467 // It is an error to be in this state if we already have all the data
468 assert(readBufferPos_ < readWant_);
469
David Reiss105961d2010-10-06 17:10:17 +0000470 try {
471 // Read from the socket
472 fetch = readWant_ - readBufferPos_;
473 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100474 } catch (TTransportException& te) {
David Reiss105961d2010-10-06 17:10:17 +0000475 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
476 close();
Mark Slee79b16942007-11-26 19:05:29 +0000477
David Reiss105961d2010-10-06 17:10:17 +0000478 return;
479 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000480
Mark Slee2f6404d2006-10-10 01:37:40 +0000481 if (got > 0) {
482 // Move along in the buffer
483 readBufferPos_ += got;
484
485 // Check that we did not overdo it
486 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000487
Mark Slee2f6404d2006-10-10 01:37:40 +0000488 // We are done reading, move onto the next state
489 if (readBufferPos_ == readWant_) {
490 transition();
491 }
492 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000493 }
494
495 // Whenever we get down here it means a remote disconnect
496 close();
Mark Slee79b16942007-11-26 19:05:29 +0000497
Mark Slee2f6404d2006-10-10 01:37:40 +0000498 return;
499
500 case SOCKET_SEND:
501 // Should never have position past size
502 assert(writeBufferPos_ <= writeBufferSize_);
503
504 // If there is no data to send, then let us move on
505 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000506 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000507 transition();
508 return;
509 }
510
David Reiss105961d2010-10-06 17:10:17 +0000511 try {
512 left = writeBufferSize_ - writeBufferPos_;
513 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100514 } catch (TTransportException& te) {
David Reiss105961d2010-10-06 17:10:17 +0000515 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000516 close();
517 return;
518 }
519
520 writeBufferPos_ += sent;
521
522 // Did we overdo it?
523 assert(writeBufferPos_ <= writeBufferSize_);
524
Mark Slee79b16942007-11-26 19:05:29 +0000525 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000526 if (writeBufferPos_ == writeBufferSize_) {
527 transition();
528 }
529
530 return;
531
532 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000533 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000534 assert(0);
535 }
536}
537
538/**
539 * This is called when the application transitions from one state into
540 * another. This means that it has finished writing the data that it needed
541 * to, or finished receiving the data that it needed to.
542 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000543void TNonblockingServer::TConnection::transition() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000544 // ensure this connection is active right now
545 assert(ioThread_);
546 assert(server_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000547
Mark Slee2f6404d2006-10-10 01:37:40 +0000548 // Switch upon the state that we are currently in and move to a new state
549 switch (appState_) {
550
551 case APP_READ_REQUEST:
552 // We are done reading the request, package the read buffer into transport
553 // and get back some data from the dispatch function
554 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000555 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000556 // Prepend four bytes of blank space to the buffer so we can
557 // write the frame size there later.
558 outputTransport_->getWritePtr(4);
559 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000560
David Reiss01fe1532010-03-09 05:19:25 +0000561 server_->incrementActiveProcessors();
562
Mark Sleee02385b2007-06-09 01:21:16 +0000563 if (server_->isThreadPoolProcessing()) {
564 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000565
David Reiss01fe1532010-03-09 05:19:25 +0000566 // Create task and dispatch to the thread manager
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100567 boost::shared_ptr<Runnable> task = boost::shared_ptr<Runnable>(
568 new Task(processor_, inputProtocol_, outputProtocol_, this));
David Reiss01fe1532010-03-09 05:19:25 +0000569 // The application is now waiting on the task to finish
570 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000571
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100572 try {
573 server_->addTask(task);
574 } catch (IllegalStateException& ise) {
575 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
576 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
577 close();
Jens Geyerfb05cf62014-12-04 21:49:07 +0100578 } catch (TimedOutException& to) {
579 GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
580 close();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100581 }
Mark Slee402ee282007-08-23 01:43:20 +0000582
David Reiss01fe1532010-03-09 05:19:25 +0000583 // Set this connection idle so that libevent doesn't process more
584 // data on it while we're still waiting for the threadmanager to
585 // finish this task
586 setIdle();
587 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000588 } else {
589 try {
Roger Meier72957452013-06-29 00:28:50 +0200590 if (serverEventHandler_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100591 serverEventHandler_->processContext(connectionContext_, getTSocket());
Roger Meier72957452013-06-29 00:28:50 +0200592 }
Mark Sleee02385b2007-06-09 01:21:16 +0000593 // Invoke the processor
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100594 processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
595 } catch (const TTransportException& ttx) {
596 GlobalOutput.printf(
597 "TNonblockingServer transport error in "
598 "process(): %s",
599 ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000600 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000601 close();
602 return;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100603 } catch (const std::exception& x) {
Bryan Duxbury1e987582011-08-25 17:33:03 +0000604 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100605 typeid(x).name(),
606 x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000607 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000608 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000609 return;
610 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000611 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000612 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000613 close();
614 return;
615 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000616 }
617
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100618 // Intentionally fall through here, the call to process has written into
619 // the writeBuffer_
Mark Slee402ee282007-08-23 01:43:20 +0000620
Mark Sleee02385b2007-06-09 01:21:16 +0000621 case APP_WAIT_TASK:
622 // We have now finished processing a task and the result has been written
623 // into the outputTransport_, so we grab its contents and place them into
624 // the writeBuffer_ for actual writing by the libevent thread
625
David Reiss01fe1532010-03-09 05:19:25 +0000626 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000627 // Get the result of the operation
628 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
629
630 // If the function call generated return data, then move into the send
631 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000632 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000633 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000634
635 // Move into write state
636 writeBufferPos_ = 0;
637 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000638
David Reissaf787782008-07-03 20:29:34 +0000639 // Put the frame size into the write buffer
640 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
641 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000642
643 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000644 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000645 setWrite();
646
647 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000648 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000649
650 return;
651 }
652
David Reissc51986f2009-03-24 20:01:25 +0000653 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000654 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000655 goto LABEL_APP_INIT;
656
Mark Slee2f6404d2006-10-10 01:37:40 +0000657 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000658 // it's now safe to perform buffer size housekeeping.
659 if (writeBufferSize_ > largestWriteBufferSize_) {
660 largestWriteBufferSize_ = writeBufferSize_;
661 }
662 if (server_->getResizeBufferEveryN() > 0
663 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
664 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
665 server_->getIdleWriteBufferLimit());
666 callsForResize_ = 0;
667 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000668
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100669 // N.B.: We also intentionally fall through here into the INIT state!
Mark Slee2f6404d2006-10-10 01:37:40 +0000670
Mark Slee92f00fb2006-10-25 01:28:17 +0000671 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000672 case APP_INIT:
673
674 // Clear write buffer variables
675 writeBuffer_ = NULL;
676 writeBufferPos_ = 0;
677 writeBufferSize_ = 0;
678
Mark Slee2f6404d2006-10-10 01:37:40 +0000679 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000680 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000681 appState_ = APP_READ_FRAME_SIZE;
682
David Reiss89a12942010-10-06 17:10:52 +0000683 readBufferPos_ = 0;
684
Mark Slee2f6404d2006-10-10 01:37:40 +0000685 // Register read event
686 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000687
Mark Slee2f6404d2006-10-10 01:37:40 +0000688 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000689 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000690
691 return;
692
693 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000694 // We just read the request length
695 // Double the buffer size until it is big enough
696 if (readWant_ > readBufferSize_) {
697 if (readBufferSize_ == 0) {
698 readBufferSize_ = 1;
699 }
700 uint32_t newSize = readBufferSize_;
701 while (readWant_ > newSize) {
702 newSize *= 2;
703 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000704
David Reiss89a12942010-10-06 17:10:52 +0000705 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
706 if (newBuffer == NULL) {
707 // nothing else to be done...
708 throw std::bad_alloc();
709 }
710 readBuffer_ = newBuffer;
711 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000712 }
713
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100714 readBufferPos_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000715
716 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000717 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000718 appState_ = APP_READ_REQUEST;
719
720 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000721 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000722
723 return;
724
David Reiss01fe1532010-03-09 05:19:25 +0000725 case APP_CLOSE_CONNECTION:
726 server_->decrementActiveProcessors();
727 close();
728 return;
729
Mark Slee2f6404d2006-10-10 01:37:40 +0000730 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000731 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000732 assert(0);
733 }
734}
735
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000736void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000737 // Catch the do nothing case
738 if (eventFlags_ == eventFlags) {
739 return;
740 }
741
742 // Delete a previously existing event
743 if (eventFlags_ != 0) {
744 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000745 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000746 return;
747 }
748 }
749
750 // Update in memory structure
751 eventFlags_ = eventFlags;
752
Mark Slee402ee282007-08-23 01:43:20 +0000753 // Do not call event_set if there are no flags
754 if (!eventFlags_) {
755 return;
756 }
757
David Reiss01fe1532010-03-09 05:19:25 +0000758 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000759 * event_set:
760 *
761 * Prepares the event structure &event to be used in future calls to
762 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000763 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000764 *
765 * The events can be either EV_READ, EV_WRITE, or both, indicating
766 * that an application can read or write from the file respectively without
767 * blocking.
768 *
Mark Sleee02385b2007-06-09 01:21:16 +0000769 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000770 * the event and the type of event which will be one of: EV_TIMEOUT,
771 * EV_SIGNAL, EV_READ, EV_WRITE.
772 *
773 * The additional flag EV_PERSIST makes an event_add() persistent until
774 * event_del() has been called.
775 *
776 * Once initialized, the &event struct can be used repeatedly with
777 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000778 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000779 * when an ev structure has been added to libevent using event_add() the
780 * structure must persist until the event occurs (assuming EV_PERSIST
781 * is not set) or is removed using event_del(). You may not reuse the same
782 * ev structure for multiple monitored descriptors; each descriptor needs
783 * its own ev.
784 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100785 event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
Jake Farrellb0d95602011-12-06 01:17:26 +0000786 event_base_set(ioThread_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000787
788 // Add the event
789 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000790 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000791 }
792}
793
794/**
795 * Closes a connection
796 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000797void TNonblockingServer::TConnection::close() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000798 // Delete the registered libevent
799 if (event_del(&event_) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400800 GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR);
David Reiss105961d2010-10-06 17:10:17 +0000801 }
802
Roger Meier72957452013-06-29 00:28:50 +0200803 if (serverEventHandler_) {
David Reiss105961d2010-10-06 17:10:17 +0000804 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000805 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000806 ioThread_ = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000807
808 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000809 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000810
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000811 // close any factory produced transports
812 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000813 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000814
Roger Meier464a3a42014-07-07 21:48:28 +0200815 // release processor and handler
816 processor_.reset();
817
Mark Slee2f6404d2006-10-10 01:37:40 +0000818 // Give this object back to the server that owns it
819 server_->returnConnection(this);
820}
821
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100822void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000823 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000824 free(readBuffer_);
825 readBuffer_ = NULL;
826 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000827 }
David Reiss54bec5d2010-10-06 17:10:45 +0000828
829 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
830 // just start over
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400831 outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
David Reiss54bec5d2010-10-06 17:10:45 +0000832 largestWriteBufferSize_ = 0;
833 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000834}
835
David Reiss8ede8182010-09-02 15:26:28 +0000836TNonblockingServer::~TNonblockingServer() {
Roger Meier0c04fcc2013-03-22 19:52:08 +0100837 // Close any active connections (moves them to the idle connection stack)
838 while (activeConnections_.size()) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100839 activeConnections_.front()->close();
Roger Meier0c04fcc2013-03-22 19:52:08 +0100840 }
David Reiss8ede8182010-09-02 15:26:28 +0000841 // Clean up unused TConnection objects in connectionStack_
842 while (!connectionStack_.empty()) {
843 TConnection* connection = connectionStack_.top();
844 connectionStack_.pop();
845 delete connection;
846 }
Roger Meier0c04fcc2013-03-22 19:52:08 +0100847 // The TNonblockingIOThread objects have shared_ptrs to the Thread
848 // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
849 // objects (as runnable) so these objects will never deallocate without help.
850 while (!ioThreads_.empty()) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100851 boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
852 ioThreads_.pop_back();
853 iot->setThread(boost::shared_ptr<Thread>());
Roger Meier0c04fcc2013-03-22 19:52:08 +0100854 }
David Reiss8ede8182010-09-02 15:26:28 +0000855}
856
Mark Slee2f6404d2006-10-10 01:37:40 +0000857/**
858 * Creates a new connection either by reusing an object off the stack or
859 * by allocating a new one entirely
860 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100861TNonblockingServer::TConnection* TNonblockingServer::createConnection(THRIFT_SOCKET socket,
862 const sockaddr* addr,
863 socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000864 // Check the stack
Jake Farrellb0d95602011-12-06 01:17:26 +0000865 Guard g(connMutex_);
866
867 // pick an IO thread to handle this connection -- currently round robin
Jake Farrellb0d95602011-12-06 01:17:26 +0000868 assert(nextIOThread_ < ioThreads_.size());
869 int selectedThreadIdx = nextIOThread_;
Ben Craig64935232013-10-09 15:21:38 -0500870 nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size());
Jake Farrellb0d95602011-12-06 01:17:26 +0000871
872 TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
873
874 // Check the connection stack to see if we can re-use
875 TConnection* result = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000876 if (connectionStack_.empty()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000877 result = new TConnection(socket, ioThread, addr, addrLen);
878 ++numTConnections_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000879 } else {
Jake Farrellb0d95602011-12-06 01:17:26 +0000880 result = connectionStack_.top();
Mark Slee2f6404d2006-10-10 01:37:40 +0000881 connectionStack_.pop();
Jake Farrellb0d95602011-12-06 01:17:26 +0000882 result->init(socket, ioThread, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000883 }
Roger Meier0c04fcc2013-03-22 19:52:08 +0100884 activeConnections_.push_back(result);
Jake Farrellb0d95602011-12-06 01:17:26 +0000885 return result;
Mark Slee2f6404d2006-10-10 01:37:40 +0000886}
887
888/**
889 * Returns a connection to the stack
890 */
891void TNonblockingServer::returnConnection(TConnection* connection) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000892 Guard g(connMutex_);
893
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100894 activeConnections_.erase(std::remove(activeConnections_.begin(),
895 activeConnections_.end(),
896 connection),
897 activeConnections_.end());
Roger Meier0c04fcc2013-03-22 19:52:08 +0100898
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100899 if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000900 delete connection;
Jake Farrellb0d95602011-12-06 01:17:26 +0000901 --numTConnections_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000902 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000903 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000904 connectionStack_.push(connection);
905 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000906}
907
908/**
David Reissa79e4882008-03-05 07:51:47 +0000909 * Server socket had something happen. We accept all waiting client
910 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000911 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400912void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100913 (void)which;
David Reiss3bb5e052010-01-25 19:31:31 +0000914 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000915 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000916
Mark Slee2f6404d2006-10-10 01:37:40 +0000917 // Server socket accepted a new connection
918 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000919 sockaddr_storage addrStorage;
920 sockaddr* addrp = (sockaddr*)&addrStorage;
921 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000922
Mark Slee2f6404d2006-10-10 01:37:40 +0000923 // Going to accept a new client socket
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400924 THRIFT_SOCKET clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000925
Mark Slee2f6404d2006-10-10 01:37:40 +0000926 // Accept as many new clients as possible, even though libevent signaled only
927 // one, this helps us to avoid having to go back into the libevent engine so
928 // many times
David Reiss105961d2010-10-06 17:10:17 +0000929 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000930 // If we're overloaded, take action here
931 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000932 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000933 nConnectionsDropped_++;
934 nTotalConnectionsDropped_++;
935 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400936 ::THRIFT_CLOSESOCKET(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000937 return;
David Reiss01fe1532010-03-09 05:19:25 +0000938 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
939 if (!drainPendingTask()) {
940 // Nothing left to discard, so we drop connection instead.
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400941 ::THRIFT_CLOSESOCKET(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000942 return;
David Reiss01fe1532010-03-09 05:19:25 +0000943 }
944 }
945 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000946
Mark Slee2f6404d2006-10-10 01:37:40 +0000947 // Explicitly set this socket to NONBLOCK mode
948 int flags;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100949 if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0
950 || THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
951 GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ",
952 THRIFT_GET_SOCKET_ERROR);
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400953 ::THRIFT_CLOSESOCKET(clientSocket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000954 return;
955 }
956
957 // Create a new TConnection for this client socket.
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100958 TConnection* clientConnection = createConnection(clientSocket, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000959
960 // Fail fast if we could not create a TConnection object
961 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000962 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400963 ::THRIFT_CLOSESOCKET(clientSocket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000964 return;
965 }
966
Jake Farrellb0d95602011-12-06 01:17:26 +0000967 /*
968 * Either notify the ioThread that is assigned this connection to
969 * start processing, or if it is us, we'll just ask this
970 * connection to do its initial state change here.
971 *
972 * (We need to avoid writing to our own notification pipe, to
973 * avoid possible deadlocks if the pipe is full.)
974 *
975 * The IO thread #0 is the only one that handles these listen
976 * events, so unless the connection has been assigned to thread #0
977 * we know it's not on our thread.
978 */
979 if (clientConnection->getIOThreadNumber() == 0) {
980 clientConnection->transition();
981 } else {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100982 if (!clientConnection->notifyIOThread()) {
983 GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
984 returnConnection(clientConnection);
985 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000986 }
David Reiss3e7fca42009-09-19 01:59:13 +0000987
988 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000989 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000990 }
Mark Slee79b16942007-11-26 19:05:29 +0000991
Mark Slee2f6404d2006-10-10 01:37:40 +0000992 // Done looping accept, now we have to make sure the error is due to
993 // blocking. Any other error is a problem
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400994 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
995 GlobalOutput.perror("thriftServerEventHandler: accept() ", THRIFT_GET_SOCKET_ERROR);
Mark Slee2f6404d2006-10-10 01:37:40 +0000996 }
997}
998
999/**
Mark Slee79b16942007-11-26 19:05:29 +00001000 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +00001001 */
Jake Farrellb0d95602011-12-06 01:17:26 +00001002void TNonblockingServer::createAndListenOnSocket() {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001003 THRIFT_SOCKET s;
Jake Farrellb0d95602011-12-06 01:17:26 +00001004
Mark Sleefb4b5142007-11-20 01:27:08 +00001005 struct addrinfo hints, *res, *res0;
1006 int error;
Mark Slee79b16942007-11-26 19:05:29 +00001007
Mark Sleefb4b5142007-11-20 01:27:08 +00001008 char port[sizeof("65536") + 1];
1009 memset(&hints, 0, sizeof(hints));
1010 hints.ai_family = PF_UNSPEC;
1011 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +00001012 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +00001013 sprintf(port, "%d", port_);
1014
1015 // Wildcard address
1016 error = getaddrinfo(NULL, port, &hints, &res0);
1017 if (error) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001018 throw TException("TNonblockingServer::serve() getaddrinfo "
1019 + string(THRIFT_GAI_STRERROR(error)));
Mark Sleefb4b5142007-11-20 01:27:08 +00001020 }
1021
1022 // Pick the ipv6 address first since ipv4 addresses can be mapped
1023 // into ipv6 space.
1024 for (res = res0; res; res = res->ai_next) {
1025 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
1026 break;
1027 }
1028
Mark Slee2f6404d2006-10-10 01:37:40 +00001029 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +00001030 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
1031 if (s == -1) {
1032 freeaddrinfo(res0);
1033 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +00001034 }
1035
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001036#ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +00001037 if (res->ai_family == AF_INET6) {
1038 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +00001039 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +00001040 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
1041 }
David Reiss13aea462008-06-10 22:56:04 +00001042 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001043#endif // #ifdef IPV6_V6ONLY
David Reiss13aea462008-06-10 22:56:04 +00001044
Mark Slee79b16942007-11-26 19:05:29 +00001045 int one = 1;
1046
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001047 // Set THRIFT_NO_SOCKET_CACHING to avoid 2MSL delay on server restart
1048 setsockopt(s, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +00001049
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001050 if (::bind(s, res->ai_addr, static_cast<int>(res->ai_addrlen)) == -1) {
1051 ::THRIFT_CLOSESOCKET(s);
Mark Slee79b16942007-11-26 19:05:29 +00001052 freeaddrinfo(res0);
Roger Meierd8f50f32012-04-11 21:48:56 +00001053 throw TTransportException(TTransportException::NOT_OPEN,
1054 "TNonblockingServer::serve() bind",
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001055 THRIFT_GET_SOCKET_ERROR);
Mark Slee79b16942007-11-26 19:05:29 +00001056 }
1057
1058 // Done with the addr info
1059 freeaddrinfo(res0);
1060
1061 // Set up this file descriptor for listening
1062 listenSocket(s);
1063}
1064
1065/**
1066 * Takes a socket created by listenSocket() and sets various options on it
1067 * to prepare for use in the server.
1068 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001069void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
Mark Slee2f6404d2006-10-10 01:37:40 +00001070 // Set socket to nonblocking mode
1071 int flags;
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001072 if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0
1073 || THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001074 ::THRIFT_CLOSESOCKET(s);
1075 throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +00001076 }
1077
1078 int one = 1;
1079 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +00001080
1081 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +00001082 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001083
1084 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +00001085 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +00001086
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001087// Set TCP nodelay if available, MAC OS X Hack
1088// See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
1089#ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +00001090 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001091#endif
Mark Slee2f6404d2006-10-10 01:37:40 +00001092
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001093#ifdef TCP_LOW_MIN_RTO
David Reiss1c20c872010-03-09 05:20:14 +00001094 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +00001095 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +00001096 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001097#endif
David Reiss1c20c872010-03-09 05:20:14 +00001098
Mark Slee79b16942007-11-26 19:05:29 +00001099 if (listen(s, LISTEN_BACKLOG) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001100 ::THRIFT_CLOSESOCKET(s);
Mark Slee79b16942007-11-26 19:05:29 +00001101 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +00001102 }
1103
Mark Slee79b16942007-11-26 19:05:29 +00001104 // Cool, this socket is good to go, set it as the serverSocket_
1105 serverSocket_ = s;
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +09001106
1107 if (!port_) {
1108 sockaddr_in addr;
1109 unsigned int size = sizeof(addr);
1110 if (!getsockname(serverSocket_, reinterpret_cast<sockaddr*>(&addr), &size)) {
1111 listenPort_ = ntohs(addr.sin_port);
1112 } else {
1113 GlobalOutput.perror("TNonblocking: failed to get listen port: ", THRIFT_GET_SOCKET_ERROR);
1114 }
1115 }
Mark Slee79b16942007-11-26 19:05:29 +00001116}
1117
David Reiss068f4162010-03-09 05:19:45 +00001118void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
1119 threadManager_ = threadManager;
Roger Meier72957452013-06-29 00:28:50 +02001120 if (threadManager) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001121 threadManager->setExpireCallback(
1122 apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose,
1123 this,
1124 apache::thrift::stdcxx::placeholders::_1));
David Reiss068f4162010-03-09 05:19:45 +00001125 threadPoolProcessing_ = true;
1126 } else {
1127 threadPoolProcessing_ = false;
1128 }
1129}
1130
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001131bool TNonblockingServer::serverOverloaded() {
David Reiss01fe1532010-03-09 05:19:25 +00001132 size_t activeConnections = numTConnections_ - connectionStack_.size();
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001133 if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) {
David Reiss01fe1532010-03-09 05:19:25 +00001134 if (!overloaded_) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001135 GlobalOutput.printf("TNonblockingServer: overload condition begun.");
David Reiss01fe1532010-03-09 05:19:25 +00001136 overloaded_ = true;
1137 }
1138 } else {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001139 if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_)
1140 && (activeConnections <= overloadHysteresis_ * maxConnections_)) {
1141 GlobalOutput.printf(
1142 "TNonblockingServer: overload ended; "
1143 "%u dropped (%llu total)",
1144 nConnectionsDropped_,
1145 nTotalConnectionsDropped_);
David Reiss01fe1532010-03-09 05:19:25 +00001146 nConnectionsDropped_ = 0;
1147 overloaded_ = false;
1148 }
1149 }
1150
1151 return overloaded_;
1152}
1153
1154bool TNonblockingServer::drainPendingTask() {
1155 if (threadManager_) {
1156 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1157 if (task) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001158 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1159 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
David Reiss01fe1532010-03-09 05:19:25 +00001160 connection->forceClose();
1161 return true;
1162 }
1163 }
1164 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001165}
1166
David Reiss068f4162010-03-09 05:19:45 +00001167void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001168 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1169 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
David Reiss068f4162010-03-09 05:19:45 +00001170 connection->forceClose();
1171}
1172
Jake Farrellb0d95602011-12-06 01:17:26 +00001173void TNonblockingServer::stop() {
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +09001174 if (!port_) {
1175 listenPort_ = 0;
1176 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001177 // Breaks the event loop in all threads so that they end ASAP.
Roger Meierd0cdecf2011-12-08 19:34:01 +00001178 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001179 ioThreads_[i]->stop();
1180 }
1181}
1182
Roger Meier6f2a5032013-07-08 23:35:25 +02001183void TNonblockingServer::registerEvents(event_base* user_event_base) {
1184 userEventBase_ = user_event_base;
1185
Jake Farrellb0d95602011-12-06 01:17:26 +00001186 // init listen socket
Roger Meiere802aa42013-07-19 21:10:54 +02001187 if (serverSocket_ == THRIFT_INVALID_SOCKET)
Roger Meier6f2a5032013-07-08 23:35:25 +02001188 createAndListenOnSocket();
Mark Slee79b16942007-11-26 19:05:29 +00001189
Jake Farrellb0d95602011-12-06 01:17:26 +00001190 // set up the IO threads
1191 assert(ioThreads_.empty());
1192 if (!numIOThreads_) {
1193 numIOThreads_ = DEFAULT_IO_THREADS;
David Reiss01fe1532010-03-09 05:19:25 +00001194 }
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +09001195 // User-provided event-base doesn't works for multi-threaded servers
1196 assert(numIOThreads_ == 1 || !userEventBase_);
David Reiss01fe1532010-03-09 05:19:25 +00001197
Roger Meierd0cdecf2011-12-08 19:34:01 +00001198 for (uint32_t id = 0; id < numIOThreads_; ++id) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001199 // the first IO thread also does the listening on server socket
Roger Meier0be9ffa2013-07-19 21:10:01 +02001200 THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
Mark Slee2f6404d2006-10-10 01:37:40 +00001201
Jake Farrellb0d95602011-12-06 01:17:26 +00001202 shared_ptr<TNonblockingIOThread> thread(
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001203 new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
Jake Farrellb0d95602011-12-06 01:17:26 +00001204 ioThreads_.push_back(thread);
1205 }
1206
1207 // Notify handler of the preServe event
Roger Meier72957452013-06-29 00:28:50 +02001208 if (eventHandler_) {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001209 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001210 }
1211
Jake Farrellb0d95602011-12-06 01:17:26 +00001212 // Start all of our helper IO threads. Note that the threads run forever,
1213 // only terminating if stop() is called.
1214 assert(ioThreads_.size() == numIOThreads_);
1215 assert(ioThreads_.size() > 0);
1216
1217 GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +09001218 listenPort_,
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001219 ioThreads_.size());
Jake Farrellb0d95602011-12-06 01:17:26 +00001220
1221 // Launch all the secondary IO threads in separate threads
1222 if (ioThreads_.size() > 1) {
Roger Meier12d70532011-12-14 23:35:28 +00001223 ioThreadFactory_.reset(new PlatformThreadFactory(
Nobuaki Sukegawa28256642014-12-16 03:24:37 +09001224#if !USE_BOOST_THREAD && !USE_STD_THREAD
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001225 PlatformThreadFactory::OTHER, // scheduler
1226 PlatformThreadFactory::NORMAL, // priority
1227 1, // stack size (MB)
Roger Meier12d70532011-12-14 23:35:28 +00001228#endif
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001229 false // detached
1230 ));
Jake Farrellb0d95602011-12-06 01:17:26 +00001231
1232 assert(ioThreadFactory_.get());
1233
1234 // intentionally starting at thread 1, not 0
Roger Meierd0cdecf2011-12-08 19:34:01 +00001235 for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001236 shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1237 ioThreads_[i]->setThread(thread);
1238 thread->start();
1239 }
1240 }
1241
Roger Meier6f2a5032013-07-08 23:35:25 +02001242 // Register the events for the primary (listener) IO thread
1243 ioThreads_[0]->registerEvents();
1244}
1245
1246/**
1247 * Main workhorse function, starts up the server listening on a port and
1248 * loops over the libevent handler.
1249 */
1250void TNonblockingServer::serve() {
1251
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +09001252 if(ioThreads_.empty())
1253 registerEvents(NULL);
Roger Meier6f2a5032013-07-08 23:35:25 +02001254
Jake Farrellb0d95602011-12-06 01:17:26 +00001255 // Run the primary (listener) IO thread loop in our main thread; this will
1256 // only return when the server is shutting down.
1257 ioThreads_[0]->run();
1258
1259 // Ensure all threads are finished before exiting serve()
Roger Meierd0cdecf2011-12-08 19:34:01 +00001260 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001261 ioThreads_[i]->join();
1262 GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1263 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001264}
1265
Jake Farrellb0d95602011-12-06 01:17:26 +00001266TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1267 int number,
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001268 THRIFT_SOCKET listenSocket,
Jake Farrellb0d95602011-12-06 01:17:26 +00001269 bool useHighPriority)
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001270 : server_(server),
1271 number_(number),
1272 listenSocket_(listenSocket),
1273 useHighPriority_(useHighPriority),
1274 eventBase_(NULL),
1275 ownEventBase_(false) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001276 notificationPipeFDs_[0] = -1;
1277 notificationPipeFDs_[1] = -1;
1278}
1279
1280TNonblockingIOThread::~TNonblockingIOThread() {
1281 // make sure our associated thread is fully finished
1282 join();
1283
Roger Meier6f2a5032013-07-08 23:35:25 +02001284 if (eventBase_ && ownEventBase_) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001285 event_base_free(eventBase_);
Roger Meier6f2a5032013-07-08 23:35:25 +02001286 ownEventBase_ = false;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001287 }
1288
Jake Farrellb0d95602011-12-06 01:17:26 +00001289 if (listenSocket_ >= 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001290 if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001291 GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001292 }
Roger Meier0be9ffa2013-07-19 21:10:01 +02001293 listenSocket_ = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +00001294 }
1295
1296 for (int i = 0; i < 2; ++i) {
1297 if (notificationPipeFDs_[i] >= 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001298 if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFDs_[i])) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001299 GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001300 THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001301 }
Roger Meier0be9ffa2013-07-19 21:10:01 +02001302 notificationPipeFDs_[i] = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +00001303 }
1304 }
1305}
1306
1307void TNonblockingIOThread::createNotificationPipe() {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001308 if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
Roger Meier12d70532011-12-14 23:35:28 +00001309 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
Jake Farrellb0d95602011-12-06 01:17:26 +00001310 throw TException("can't create notification pipe");
1311 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001312 if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
1313 || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001314 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1315 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
1316 throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
Jake Farrellb0d95602011-12-06 01:17:26 +00001317 }
1318 for (int i = 0; i < 2; ++i) {
Roger Meier12d70532011-12-14 23:35:28 +00001319#if LIBEVENT_VERSION_NUMBER < 0x02000000
1320 int flags;
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001321 if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0
1322 || THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
Roger Meier12d70532011-12-14 23:35:28 +00001323#else
1324 if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
1325#endif
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001326 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1327 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001328 throw TException(
1329 "TNonblockingServer::createNotificationPipe() "
1330 "FD_CLOEXEC");
Jake Farrellb0d95602011-12-06 01:17:26 +00001331 }
1332 }
1333}
1334
1335/**
1336 * Register the core libevent events onto the proper base.
1337 */
1338void TNonblockingIOThread::registerEvents() {
Roger Meier6f2a5032013-07-08 23:35:25 +02001339 threadId_ = Thread::get_current();
1340
1341 assert(eventBase_ == 0);
1342 eventBase_ = getServer()->getUserEventBase();
1343 if (eventBase_ == NULL) {
1344 eventBase_ = event_base_new();
1345 ownEventBase_ = true;
1346 }
1347
1348 // Print some libevent stats
1349 if (number_ == 0) {
1350 GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001351 event_get_version(),
1352 event_base_get_method(eventBase_));
Roger Meier6f2a5032013-07-08 23:35:25 +02001353 }
1354
Jake Farrellb0d95602011-12-06 01:17:26 +00001355 if (listenSocket_ >= 0) {
1356 // Register the server event
1357 event_set(&serverEvent_,
1358 listenSocket_,
1359 EV_READ | EV_PERSIST,
1360 TNonblockingIOThread::listenHandler,
1361 server_);
1362 event_base_set(eventBase_, &serverEvent_);
1363
1364 // Add the event and start up the server
1365 if (-1 == event_add(&serverEvent_, 0)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001366 throw TException(
1367 "TNonblockingServer::serve(): "
1368 "event_add() failed on server listen event");
Jake Farrellb0d95602011-12-06 01:17:26 +00001369 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001370 GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001371 }
1372
1373 createNotificationPipe();
1374
1375 // Create an event to be notified when a task finishes
1376 event_set(&notificationEvent_,
1377 getNotificationRecvFD(),
1378 EV_READ | EV_PERSIST,
1379 TNonblockingIOThread::notifyHandler,
1380 this);
1381
1382 // Attach to the base
1383 event_base_set(eventBase_, &notificationEvent_);
1384
1385 // Add the event and start up the server
1386 if (-1 == event_add(&notificationEvent_, 0)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001387 throw TException(
1388 "TNonblockingServer::serve(): "
1389 "event_add() failed on task-done notification event");
Jake Farrellb0d95602011-12-06 01:17:26 +00001390 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001391 GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001392}
1393
1394bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001395 THRIFT_SOCKET fd = getNotificationSendFD();
Jake Farrellb0d95602011-12-06 01:17:26 +00001396 if (fd < 0) {
1397 return false;
1398 }
1399
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001400 fd_set wfds, efds;
abadcafe38772c92015-04-03 22:23:04 +08001401 int ret = -1;
abadcafe38772c92015-04-03 22:23:04 +08001402 int kSize = sizeof(conn);
1403 const char * pos = (const char *)const_cast_sockopt(&conn);
1404
1405 while (kSize > 0) {
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001406 FD_ZERO(&wfds);
1407 FD_ZERO(&efds);
1408 FD_SET(fd, &wfds);
1409 FD_SET(fd, &efds);
1410 ret = select(fd + 1, NULL, &wfds, &efds, NULL);
abadcafe38772c92015-04-03 22:23:04 +08001411 if (ret < 0) {
1412 return false;
1413 } else if (ret == 0) {
1414 continue;
1415 }
1416
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001417 if (FD_ISSET(fd, &efds)) {
1418 ::THRIFT_CLOSESOCKET(fd);
abadcafe38772c92015-04-03 22:23:04 +08001419 return false;
1420 }
1421
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001422 if (FD_ISSET(fd, &wfds)) {
abadcafe38772c92015-04-03 22:23:04 +08001423 ret = send(fd, pos, kSize, 0);
1424 if (ret < 0) {
1425 if (errno == EAGAIN) {
1426 continue;
1427 }
1428
Lei Feiweib5ebcd12015-04-04 22:12:07 +08001429 ::THRIFT_CLOSESOCKET(fd);
abadcafe38772c92015-04-03 22:23:04 +08001430 return false;
1431 }
1432
1433 kSize -= ret;
1434 pos += ret;
1435 }
Jake Farrellb0d95602011-12-06 01:17:26 +00001436 }
1437
1438 return true;
1439}
1440
1441/* static */
Roger Meier12d70532011-12-14 23:35:28 +00001442void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001443 TNonblockingIOThread* ioThread = (TNonblockingIOThread*)v;
Jake Farrellb0d95602011-12-06 01:17:26 +00001444 assert(ioThread);
Roger Meierd0cdecf2011-12-08 19:34:01 +00001445 (void)which;
Jake Farrellb0d95602011-12-06 01:17:26 +00001446
1447 while (true) {
1448 TNonblockingServer::TConnection* connection = 0;
1449 const int kSize = sizeof(connection);
Ben Craig64935232013-10-09 15:21:38 -05001450 long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
Jake Farrellb0d95602011-12-06 01:17:26 +00001451 if (nBytes == kSize) {
1452 if (connection == NULL) {
1453 // this is the command to stop our thread, exit the handler!
1454 return;
1455 }
1456 connection->transition();
1457 } else if (nBytes > 0) {
1458 // throw away these bytes and hope that next time we get a solid read
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001459 GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
Jake Farrellb0d95602011-12-06 01:17:26 +00001460 ioThread->breakLoop(true);
1461 return;
1462 } else if (nBytes == 0) {
1463 GlobalOutput.printf("notifyHandler: Notify socket closed!");
1464 // exit the loop
1465 break;
1466 } else { // nBytes < 0
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001467 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
1468 && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
1469 GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
1470 ioThread->breakLoop(true);
1471 return;
Jake Farrellb0d95602011-12-06 01:17:26 +00001472 }
1473 // exit the loop
1474 break;
1475 }
1476 }
1477}
1478
1479void TNonblockingIOThread::breakLoop(bool error) {
1480 if (error) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001481 GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001482 // TODO: figure out something better to do here, but for now kill the
1483 // whole process.
1484 GlobalOutput.printf("TNonblockingServer: aborting process.");
1485 ::abort();
1486 }
1487
1488 // sets a flag so that the loop exits on the next event
Bryan Duxbury76c43682011-08-24 21:26:48 +00001489 event_base_loopbreak(eventBase_);
1490
Jake Farrellb0d95602011-12-06 01:17:26 +00001491 // event_base_loopbreak() only causes the loop to exit the next time
1492 // it wakes up. We need to force it to wake up, in case there are
1493 // no real events it needs to process.
Bryan Duxbury76c43682011-08-24 21:26:48 +00001494 //
Jake Farrellb0d95602011-12-06 01:17:26 +00001495 // If we're running in the same thread, we can't use the notify(0)
1496 // mechanism to stop the thread, but happily if we're running in the
1497 // same thread, this means the thread can't be blocking in the event
1498 // loop either.
Roger Meier12d70532011-12-14 23:35:28 +00001499 if (!Thread::is_current(threadId_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001500 notify(NULL);
1501 }
1502}
1503
1504void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
Roger Meier12d70532011-12-14 23:35:28 +00001505#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +00001506 // Start out with a standard, low-priority setup for the sched params.
1507 struct sched_param sp;
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001508 bzero((void*)&sp, sizeof(sp));
Jake Farrellb0d95602011-12-06 01:17:26 +00001509 int policy = SCHED_OTHER;
1510
1511 // If desired, set up high-priority sched params structure.
1512 if (value) {
1513 // FIFO scheduler, ranked above default SCHED_OTHER queue
1514 policy = SCHED_FIFO;
1515 // The priority only compares us to other SCHED_FIFO threads, so we
1516 // just pick a random priority halfway between min & max.
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001517 const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;
Jake Farrellb0d95602011-12-06 01:17:26 +00001518
1519 sp.sched_priority = priority;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001520 }
1521
Jake Farrellb0d95602011-12-06 01:17:26 +00001522 // Actually set the sched params for the current thread.
1523 if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001524 GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001525 } else {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001526 GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001527 }
Roger Meierd051ca02013-08-15 01:35:11 +02001528#else
1529 THRIFT_UNUSED_VARIABLE(value);
Roger Meier12d70532011-12-14 23:35:28 +00001530#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001531}
Bryan Duxbury76c43682011-08-24 21:26:48 +00001532
Jake Farrellb0d95602011-12-06 01:17:26 +00001533void TNonblockingIOThread::run() {
Roger Meier6f2a5032013-07-08 23:35:25 +02001534 if (eventBase_ == NULL)
1535 registerEvents();
Jake Farrellb0d95602011-12-06 01:17:26 +00001536
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001537 GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001538
1539 if (useHighPriority_) {
1540 setCurrentThreadHighPriority(true);
1541 }
1542
1543 // Run libevent engine, never returns, invokes calls to eventHandler
1544 event_base_loop(eventBase_, 0);
1545
1546 if (useHighPriority_) {
1547 setCurrentThreadHighPriority(false);
1548 }
1549
1550 // cleans up our registered events
1551 cleanupEvents();
1552
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001553 GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
Jake Farrellb0d95602011-12-06 01:17:26 +00001554}
1555
1556void TNonblockingIOThread::cleanupEvents() {
1557 // stop the listen socket, if any
1558 if (listenSocket_ >= 0) {
1559 if (event_del(&serverEvent_) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001560 GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001561 }
1562 }
1563
1564 event_del(&notificationEvent_);
1565}
1566
Jake Farrellb0d95602011-12-06 01:17:26 +00001567void TNonblockingIOThread::stop() {
1568 // This should cause the thread to fall out of its event loop ASAP.
1569 breakLoop(false);
1570}
1571
1572void TNonblockingIOThread::join() {
1573 // If this was a thread created by a factory (not the thread that called
1574 // serve()), we join() it to make sure we shut down fully.
1575 if (thread_) {
1576 try {
1577 // Note that it is safe to both join() ourselves twice, as well as join
1578 // the current thread as the pthread implementation checks for deadlock.
1579 thread_->join();
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001580 } catch (...) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001581 // swallow everything
1582 }
1583 }
Bryan Duxbury76c43682011-08-24 21:26:48 +00001584}
Konrad Grochowski16a23a62014-11-13 15:33:38 +01001585}
1586}
1587} // apache::thrift::server