blob: 12b4b8f33da9d47fcf620c2e6fbfc95408547fcb [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meier3781c242011-12-11 20:07:21 +000020#define __STDC_FORMAT_MACROS
21
Konrad Grochowski9be4e682013-06-22 22:03:31 +020022#include <thrift/thrift-config.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000023
Roger Meier4285ba22013-06-10 21:17:23 +020024#include <thrift/server/TNonblockingServer.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000025#include <thrift/concurrency/Exception.h>
26#include <thrift/transport/TSocket.h>
27#include <thrift/concurrency/PlatformThreadFactory.h>
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040028#include <thrift/transport/PlatformSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000029
Mark Sleee02385b2007-06-09 01:21:16 +000030#include <iostream>
Roger Meier30aae0c2011-07-08 12:23:31 +000031
32#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000033#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000034#endif
35
36#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000037#include <netinet/in.h>
38#include <netinet/tcp.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000039#endif
40
41#ifdef HAVE_ARPA_INET_H
Bryan Duxbury76c43682011-08-24 21:26:48 +000042#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000043#endif
44
45#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000046#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000047#endif
48
Roger Meier2fa9c312011-09-05 19:15:53 +000049#ifdef HAVE_FCNTL_H
Mark Slee2f6404d2006-10-10 01:37:40 +000050#include <fcntl.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000051#endif
52
Mark Slee2f6404d2006-10-10 01:37:40 +000053#include <assert.h>
Roger Meier12d70532011-12-14 23:35:28 +000054
55#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +000056#include <sched.h>
Roger Meier12d70532011-12-14 23:35:28 +000057#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000058
David Reiss9b903442009-10-21 05:51:28 +000059#ifndef AF_LOCAL
60#define AF_LOCAL AF_UNIX
61#endif
62
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040063#if !defined(PRIu32)
Roger Meier12d70532011-12-14 23:35:28 +000064#define PRIu32 "I32u"
Roger Meierf2b094f2013-06-04 22:09:37 +020065#define PRIu64 "I64u"
Roger Meier12d70532011-12-14 23:35:28 +000066#endif
67
T Jake Lucianib5e62212009-01-31 22:36:20 +000068namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000069
T Jake Lucianib5e62212009-01-31 22:36:20 +000070using namespace apache::thrift::protocol;
71using namespace apache::thrift::transport;
72using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000073using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000074using apache::thrift::transport::TSocket;
75using apache::thrift::transport::TTransportException;
Jake Farrellb0d95602011-12-06 01:17:26 +000076using boost::shared_ptr;
Mark Sleee02385b2007-06-09 01:21:16 +000077
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000078/// Three states for sockets: recv frame size, recv data, and send mode
79enum TSocketState {
80 SOCKET_RECV_FRAMING,
81 SOCKET_RECV,
82 SOCKET_SEND
83};
84
85/**
86 * Five states for the nonblocking server:
87 * 1) initialize
88 * 2) read 4 byte frame size
89 * 3) read frame of data
90 * 4) send back data (if any)
91 * 5) force immediate connection close
92 */
93enum TAppState {
94 APP_INIT,
95 APP_READ_FRAME_SIZE,
96 APP_READ_REQUEST,
97 APP_WAIT_TASK,
98 APP_SEND_RESULT,
99 APP_CLOSE_CONNECTION
100};
101
102/**
103 * Represents a connection that is handled via libevent. This connection
104 * essentially encapsulates a socket that has some associated libevent state.
105 */
106class TNonblockingServer::TConnection {
107 private:
Jake Farrellb0d95602011-12-06 01:17:26 +0000108 /// Server IO Thread handling this connection
109 TNonblockingIOThread* ioThread_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000110
111 /// Server handle
112 TNonblockingServer* server_;
113
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000114 /// TProcessor
115 boost::shared_ptr<TProcessor> processor_;
116
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000117 /// Object wrapping network socket
118 boost::shared_ptr<TSocket> tSocket_;
119
120 /// Libevent object
121 struct event event_;
122
123 /// Libevent flags
124 short eventFlags_;
125
126 /// Socket mode
127 TSocketState socketState_;
128
129 /// Application state
130 TAppState appState_;
131
132 /// How much data needed to read
133 uint32_t readWant_;
134
135 /// Where in the read buffer are we
136 uint32_t readBufferPos_;
137
138 /// Read buffer
139 uint8_t* readBuffer_;
140
141 /// Read buffer size
142 uint32_t readBufferSize_;
143
144 /// Write buffer
145 uint8_t* writeBuffer_;
146
147 /// Write buffer size
148 uint32_t writeBufferSize_;
149
150 /// How far through writing are we?
151 uint32_t writeBufferPos_;
152
153 /// Largest size of write buffer seen since buffer was constructed
154 size_t largestWriteBufferSize_;
155
156 /// Count of the number of calls for use with getResizeBufferEveryN().
157 int32_t callsForResize_;
158
159 /// Task handle
160 int taskHandle_;
161
162 /// Task event
163 struct event taskEvent_;
164
165 /// Transport to read from
166 boost::shared_ptr<TMemoryBuffer> inputTransport_;
167
168 /// Transport that processor writes to
169 boost::shared_ptr<TMemoryBuffer> outputTransport_;
170
171 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
172 boost::shared_ptr<TTransport> factoryInputTransport_;
173 boost::shared_ptr<TTransport> factoryOutputTransport_;
174
175 /// Protocol decoder
176 boost::shared_ptr<TProtocol> inputProtocol_;
177
178 /// Protocol encoder
179 boost::shared_ptr<TProtocol> outputProtocol_;
180
181 /// Server event handler, if any
182 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
183
184 /// Thrift call context, if any
185 void *connectionContext_;
186
187 /// Go into read mode
188 void setRead() {
189 setFlags(EV_READ | EV_PERSIST);
190 }
191
192 /// Go into write mode
193 void setWrite() {
194 setFlags(EV_WRITE | EV_PERSIST);
195 }
196
197 /// Set socket idle
198 void setIdle() {
199 setFlags(0);
200 }
201
202 /**
203 * Set event flags for this connection.
204 *
205 * @param eventFlags flags we pass to libevent for the connection.
206 */
207 void setFlags(short eventFlags);
208
209 /**
210 * Libevent handler called (via our static wrapper) when the connection
211 * socket had something happen. Rather than use the flags libevent passed,
212 * we use the connection state to determine whether we need to read or
213 * write the socket.
214 */
215 void workSocket();
216
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000217 public:
218
219 class Task;
220
221 /// Constructor
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400222 TConnection(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000223 const sockaddr* addr, socklen_t addrLen) {
224 readBuffer_ = NULL;
225 readBufferSize_ = 0;
226
Jake Farrellb0d95602011-12-06 01:17:26 +0000227 ioThread_ = ioThread;
228 server_ = ioThread->getServer();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000229
Jake Farrellb0d95602011-12-06 01:17:26 +0000230 // Allocate input and output transports these only need to be allocated
231 // once per TConnection (they don't need to be reallocated on init() call)
232 inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400233 outputTransport_.reset(
234 new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
Jake Farrellb0d95602011-12-06 01:17:26 +0000235 tSocket_.reset(new TSocket());
236 init(socket, ioThread, addr, addrLen);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000237 }
238
239 ~TConnection() {
240 std::free(readBuffer_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000241 }
242
Roger Meier0c04fcc2013-03-22 19:52:08 +0100243 /// Close this connection and free or reset its resources.
244 void close();
245
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000246 /**
247 * Check buffers against any size limits and shrink it if exceeded.
248 *
249 * @param readLimit we reduce read buffer size to this (if nonzero).
250 * @param writeLimit if nonzero and write buffer is larger, replace it.
251 */
252 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
253
254 /// Initialize
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400255 void init(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000256 const sockaddr* addr, socklen_t addrLen);
257
258 /**
259 * This is called when the application transitions from one state into
260 * another. This means that it has finished writing the data that it needed
261 * to, or finished receiving the data that it needed to.
262 */
263 void transition();
264
265 /**
266 * C-callable event handler for connection events. Provides a callback
267 * that libevent can understand which invokes connection_->workSocket().
268 *
269 * @param fd the descriptor the event occurred on.
270 * @param which the flags associated with the event.
271 * @param v void* callback arg where we placed TConnection's "this".
272 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000273 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000274 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
275 ((TConnection*)v)->workSocket();
276 }
277
278 /**
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000279 * Notification to server that processing has ended on this request.
280 * Can be called either when processing is completed or when a waiting
281 * task has been preemptively terminated (on overload).
282 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000283 * Don't call this from the IO thread itself.
284 *
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400285 * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000286 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000287 bool notifyIOThread() {
288 return ioThread_->notify(this);
289 }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000290
Jake Farrellb0d95602011-12-06 01:17:26 +0000291 /*
292 * Returns the number of this connection's currently assigned IO
293 * thread.
294 */
295 int getIOThreadNumber() const {
296 return ioThread_->getThreadNumber();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000297 }
298
299 /// Force connection shutdown for this connection.
300 void forceClose() {
301 appState_ = APP_CLOSE_CONNECTION;
Jake Farrellb0d95602011-12-06 01:17:26 +0000302 if (!notifyIOThread()) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000303 throw TException("TConnection::forceClose: failed write on notify pipe");
304 }
305 }
306
307 /// return the server this connection was initialized for.
Jake Farrellb0d95602011-12-06 01:17:26 +0000308 TNonblockingServer* getServer() const {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000309 return server_;
310 }
311
312 /// get state of connection.
Jake Farrellb0d95602011-12-06 01:17:26 +0000313 TAppState getState() const {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000314 return appState_;
315 }
316
317 /// return the TSocket transport wrapping this network connection
318 boost::shared_ptr<TSocket> getTSocket() const {
319 return tSocket_;
320 }
321
322 /// return the server event handler if any
323 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
324 return serverEventHandler_;
325 }
326
327 /// return the Thrift connection context if any
328 void* getConnectionContext() {
329 return connectionContext_;
330 }
331
332};
333
334class TNonblockingServer::TConnection::Task: public Runnable {
Mark Sleee02385b2007-06-09 01:21:16 +0000335 public:
336 Task(boost::shared_ptr<TProcessor> processor,
337 boost::shared_ptr<TProtocol> input,
338 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +0000339 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +0000340 processor_(processor),
341 input_(input),
342 output_(output),
David Reiss105961d2010-10-06 17:10:17 +0000343 connection_(connection),
344 serverEventHandler_(connection_->getServerEventHandler()),
345 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000346
347 void run() {
348 try {
David Reiss105961d2010-10-06 17:10:17 +0000349 for (;;) {
350 if (serverEventHandler_ != NULL) {
351 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
352 }
353 if (!processor_->process(input_, output_, connectionContext_) ||
354 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000355 break;
356 }
357 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000358 } catch (const TTransportException& ttx) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000359 GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
Bryan Duxbury1e987582011-08-25 17:33:03 +0000360 } catch (const bad_alloc&) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000361 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
Henrique Mendonca962b3532012-09-20 13:19:55 +0000362 exit(1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000363 } catch (const std::exception& x) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000364 GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
Bryan Duxbury1e987582011-08-25 17:33:03 +0000365 typeid(x).name(), x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000366 } catch (...) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000367 GlobalOutput.printf(
368 "TNonblockingServer: unknown exception while processing.");
Mark Sleee02385b2007-06-09 01:21:16 +0000369 }
Mark Slee79b16942007-11-26 19:05:29 +0000370
David Reiss01fe1532010-03-09 05:19:25 +0000371 // Signal completion back to the libevent thread via a pipe
Jake Farrellb0d95602011-12-06 01:17:26 +0000372 if (!connection_->notifyIOThread()) {
David Reiss01fe1532010-03-09 05:19:25 +0000373 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000374 }
David Reiss01fe1532010-03-09 05:19:25 +0000375 }
376
377 TConnection* getTConnection() {
378 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +0000379 }
380
381 private:
382 boost::shared_ptr<TProcessor> processor_;
383 boost::shared_ptr<TProtocol> input_;
384 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000385 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000386 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
387 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000388};
Mark Slee5ea15f92007-03-05 22:55:59 +0000389
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400390void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
Jake Farrellb0d95602011-12-06 01:17:26 +0000391 TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000392 const sockaddr* addr,
393 socklen_t addrLen) {
David Reiss105961d2010-10-06 17:10:17 +0000394 tSocket_->setSocketFD(socket);
395 tSocket_->setCachedAddress(addr, addrLen);
396
Jake Farrellb0d95602011-12-06 01:17:26 +0000397 ioThread_ = ioThread;
398 server_ = ioThread->getServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000399 appState_ = APP_INIT;
400 eventFlags_ = 0;
401
402 readBufferPos_ = 0;
403 readWant_ = 0;
404
405 writeBuffer_ = NULL;
406 writeBufferSize_ = 0;
407 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000408 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000409
David Reiss89a12942010-10-06 17:10:52 +0000410 socketState_ = SOCKET_RECV_FRAMING;
David Reiss54bec5d2010-10-06 17:10:45 +0000411 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000412
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000413 // get input/transports
Jake Farrellb0d95602011-12-06 01:17:26 +0000414 factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
415 inputTransport_);
416 factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
417 outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000418
419 // Create protocol
Jake Farrellb0d95602011-12-06 01:17:26 +0000420 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
421 factoryInputTransport_);
422 outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
423 factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000424
425 // Set up for any server event handler
426 serverEventHandler_ = server_->getEventHandler();
427 if (serverEventHandler_ != NULL) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000428 connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
429 outputProtocol_);
David Reiss105961d2010-10-06 17:10:17 +0000430 } else {
431 connectionContext_ = NULL;
432 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000433
434 // Get the processor
Jake Farrellb0d95602011-12-06 01:17:26 +0000435 processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000436}
437
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000438void TNonblockingServer::TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000439 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000440 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000441
442 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000443 case SOCKET_RECV_FRAMING:
444 union {
445 uint8_t buf[sizeof(uint32_t)];
Roger Meier3781c242011-12-11 20:07:21 +0000446 uint32_t size;
David Reiss89a12942010-10-06 17:10:52 +0000447 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000448
David Reiss89a12942010-10-06 17:10:52 +0000449 // if we've already received some bytes we kept them here
450 framing.size = readWant_;
451 // determine size of this frame
452 try {
453 // Read from the socket
454 fetch = tSocket_->read(&framing.buf[readBufferPos_],
455 uint32_t(sizeof(framing.size) - readBufferPos_));
456 if (fetch == 0) {
457 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000458 close();
459 return;
460 }
David Reiss89a12942010-10-06 17:10:52 +0000461 readBufferPos_ += fetch;
462 } catch (TTransportException& te) {
463 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
464 close();
465
466 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000467 }
468
David Reiss89a12942010-10-06 17:10:52 +0000469 if (readBufferPos_ < sizeof(framing.size)) {
470 // more needed before frame size is known -- save what we have so far
471 readWant_ = framing.size;
472 return;
473 }
474
475 readWant_ = ntohl(framing.size);
Roger Meier3781c242011-12-11 20:07:21 +0000476 if (readWant_ > server_->getMaxFrameSize()) {
477 // Don't allow giant frame sizes. This prevents bad clients from
478 // causing us to try and allocate a giant buffer.
479 GlobalOutput.printf("TNonblockingServer: frame size too large "
Roger Meierf2b094f2013-06-04 22:09:37 +0200480 "(%"PRIu32" > %"PRIu64") from client %s. "
481 "Remote side not using TFramedTransport?",
482 readWant_,
483 (uint64_t)server_->getMaxFrameSize(),
Roger Meier3781c242011-12-11 20:07:21 +0000484 tSocket_->getSocketInfo().c_str());
David Reiss89a12942010-10-06 17:10:52 +0000485 close();
486 return;
487 }
488 // size known; now get the rest of the frame
489 transition();
490 return;
491
492 case SOCKET_RECV:
493 // It is an error to be in this state if we already have all the data
494 assert(readBufferPos_ < readWant_);
495
David Reiss105961d2010-10-06 17:10:17 +0000496 try {
497 // Read from the socket
498 fetch = readWant_ - readBufferPos_;
499 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
500 }
501 catch (TTransportException& te) {
502 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
503 close();
Mark Slee79b16942007-11-26 19:05:29 +0000504
David Reiss105961d2010-10-06 17:10:17 +0000505 return;
506 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000507
Mark Slee2f6404d2006-10-10 01:37:40 +0000508 if (got > 0) {
509 // Move along in the buffer
510 readBufferPos_ += got;
511
512 // Check that we did not overdo it
513 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000514
Mark Slee2f6404d2006-10-10 01:37:40 +0000515 // We are done reading, move onto the next state
516 if (readBufferPos_ == readWant_) {
517 transition();
518 }
519 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000520 }
521
522 // Whenever we get down here it means a remote disconnect
523 close();
Mark Slee79b16942007-11-26 19:05:29 +0000524
Mark Slee2f6404d2006-10-10 01:37:40 +0000525 return;
526
527 case SOCKET_SEND:
528 // Should never have position past size
529 assert(writeBufferPos_ <= writeBufferSize_);
530
531 // If there is no data to send, then let us move on
532 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000533 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000534 transition();
535 return;
536 }
537
David Reiss105961d2010-10-06 17:10:17 +0000538 try {
539 left = writeBufferSize_ - writeBufferPos_;
540 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
541 }
542 catch (TTransportException& te) {
543 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000544 close();
545 return;
546 }
547
548 writeBufferPos_ += sent;
549
550 // Did we overdo it?
551 assert(writeBufferPos_ <= writeBufferSize_);
552
Mark Slee79b16942007-11-26 19:05:29 +0000553 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000554 if (writeBufferPos_ == writeBufferSize_) {
555 transition();
556 }
557
558 return;
559
560 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000561 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000562 assert(0);
563 }
564}
565
566/**
567 * This is called when the application transitions from one state into
568 * another. This means that it has finished writing the data that it needed
569 * to, or finished receiving the data that it needed to.
570 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000571void TNonblockingServer::TConnection::transition() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000572 // ensure this connection is active right now
573 assert(ioThread_);
574 assert(server_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000575
Mark Slee2f6404d2006-10-10 01:37:40 +0000576 // Switch upon the state that we are currently in and move to a new state
577 switch (appState_) {
578
579 case APP_READ_REQUEST:
580 // We are done reading the request, package the read buffer into transport
581 // and get back some data from the dispatch function
582 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000583 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000584 // Prepend four bytes of blank space to the buffer so we can
585 // write the frame size there later.
586 outputTransport_->getWritePtr(4);
587 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000588
David Reiss01fe1532010-03-09 05:19:25 +0000589 server_->incrementActiveProcessors();
590
Mark Sleee02385b2007-06-09 01:21:16 +0000591 if (server_->isThreadPoolProcessing()) {
592 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000593
David Reiss01fe1532010-03-09 05:19:25 +0000594 // Create task and dispatch to the thread manager
595 boost::shared_ptr<Runnable> task =
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000596 boost::shared_ptr<Runnable>(new Task(processor_,
David Reiss01fe1532010-03-09 05:19:25 +0000597 inputProtocol_,
598 outputProtocol_,
599 this));
600 // The application is now waiting on the task to finish
601 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000602
David Reisse11f3072008-10-07 21:39:19 +0000603 try {
604 server_->addTask(task);
605 } catch (IllegalStateException & ise) {
606 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000607 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000608 close();
609 }
Mark Slee402ee282007-08-23 01:43:20 +0000610
David Reiss01fe1532010-03-09 05:19:25 +0000611 // Set this connection idle so that libevent doesn't process more
612 // data on it while we're still waiting for the threadmanager to
613 // finish this task
614 setIdle();
615 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000616 } else {
617 try {
Roger Meierae44abc2012-07-18 05:42:51 +0000618 if (serverEventHandler_ != NULL) {
619 serverEventHandler_->processContext(connectionContext_,
620 getTSocket());
621 }
Mark Sleee02385b2007-06-09 01:21:16 +0000622 // Invoke the processor
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000623 processor_->process(inputProtocol_, outputProtocol_,
624 connectionContext_);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000625 } catch (const TTransportException &ttx) {
626 GlobalOutput.printf("TNonblockingServer transport error in "
627 "process(): %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000628 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000629 close();
630 return;
Bryan Duxbury1e987582011-08-25 17:33:03 +0000631 } catch (const std::exception &x) {
632 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
633 typeid(x).name(), x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000634 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000635 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000636 return;
637 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000638 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000639 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000640 close();
641 return;
642 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000643 }
644
Mark Slee402ee282007-08-23 01:43:20 +0000645 // Intentionally fall through here, the call to process has written into
646 // the writeBuffer_
647
Mark Sleee02385b2007-06-09 01:21:16 +0000648 case APP_WAIT_TASK:
649 // We have now finished processing a task and the result has been written
650 // into the outputTransport_, so we grab its contents and place them into
651 // the writeBuffer_ for actual writing by the libevent thread
652
David Reiss01fe1532010-03-09 05:19:25 +0000653 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000654 // Get the result of the operation
655 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
656
657 // If the function call generated return data, then move into the send
658 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000659 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000660 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000661
662 // Move into write state
663 writeBufferPos_ = 0;
664 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000665
David Reissaf787782008-07-03 20:29:34 +0000666 // Put the frame size into the write buffer
667 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
668 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000669
670 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000671 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000672 setWrite();
673
674 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000675 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000676
677 return;
678 }
679
David Reissc51986f2009-03-24 20:01:25 +0000680 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000681 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000682 goto LABEL_APP_INIT;
683
Mark Slee2f6404d2006-10-10 01:37:40 +0000684 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000685 // it's now safe to perform buffer size housekeeping.
686 if (writeBufferSize_ > largestWriteBufferSize_) {
687 largestWriteBufferSize_ = writeBufferSize_;
688 }
689 if (server_->getResizeBufferEveryN() > 0
690 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
691 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
692 server_->getIdleWriteBufferLimit());
693 callsForResize_ = 0;
694 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000695
696 // N.B.: We also intentionally fall through here into the INIT state!
697
Mark Slee92f00fb2006-10-25 01:28:17 +0000698 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000699 case APP_INIT:
700
701 // Clear write buffer variables
702 writeBuffer_ = NULL;
703 writeBufferPos_ = 0;
704 writeBufferSize_ = 0;
705
Mark Slee2f6404d2006-10-10 01:37:40 +0000706 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000707 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000708 appState_ = APP_READ_FRAME_SIZE;
709
David Reiss89a12942010-10-06 17:10:52 +0000710 readBufferPos_ = 0;
711
Mark Slee2f6404d2006-10-10 01:37:40 +0000712 // Register read event
713 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000714
Mark Slee2f6404d2006-10-10 01:37:40 +0000715 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000716 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000717
718 return;
719
720 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000721 // We just read the request length
722 // Double the buffer size until it is big enough
723 if (readWant_ > readBufferSize_) {
724 if (readBufferSize_ == 0) {
725 readBufferSize_ = 1;
726 }
727 uint32_t newSize = readBufferSize_;
728 while (readWant_ > newSize) {
729 newSize *= 2;
730 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000731
David Reiss89a12942010-10-06 17:10:52 +0000732 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
733 if (newBuffer == NULL) {
734 // nothing else to be done...
735 throw std::bad_alloc();
736 }
737 readBuffer_ = newBuffer;
738 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000739 }
740
Mark Slee2f6404d2006-10-10 01:37:40 +0000741 readBufferPos_= 0;
742
743 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000744 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000745 appState_ = APP_READ_REQUEST;
746
747 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000748 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000749
750 return;
751
David Reiss01fe1532010-03-09 05:19:25 +0000752 case APP_CLOSE_CONNECTION:
753 server_->decrementActiveProcessors();
754 close();
755 return;
756
Mark Slee2f6404d2006-10-10 01:37:40 +0000757 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000758 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000759 assert(0);
760 }
761}
762
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000763void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000764 // Catch the do nothing case
765 if (eventFlags_ == eventFlags) {
766 return;
767 }
768
769 // Delete a previously existing event
770 if (eventFlags_ != 0) {
771 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000772 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000773 return;
774 }
775 }
776
777 // Update in memory structure
778 eventFlags_ = eventFlags;
779
Mark Slee402ee282007-08-23 01:43:20 +0000780 // Do not call event_set if there are no flags
781 if (!eventFlags_) {
782 return;
783 }
784
David Reiss01fe1532010-03-09 05:19:25 +0000785 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000786 * event_set:
787 *
788 * Prepares the event structure &event to be used in future calls to
789 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000790 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000791 *
792 * The events can be either EV_READ, EV_WRITE, or both, indicating
793 * that an application can read or write from the file respectively without
794 * blocking.
795 *
Mark Sleee02385b2007-06-09 01:21:16 +0000796 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000797 * the event and the type of event which will be one of: EV_TIMEOUT,
798 * EV_SIGNAL, EV_READ, EV_WRITE.
799 *
800 * The additional flag EV_PERSIST makes an event_add() persistent until
801 * event_del() has been called.
802 *
803 * Once initialized, the &event struct can be used repeatedly with
804 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000805 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000806 * when an ev structure has been added to libevent using event_add() the
807 * structure must persist until the event occurs (assuming EV_PERSIST
808 * is not set) or is removed using event_del(). You may not reuse the same
809 * ev structure for multiple monitored descriptors; each descriptor needs
810 * its own ev.
811 */
David Reiss105961d2010-10-06 17:10:17 +0000812 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
813 TConnection::eventHandler, this);
Jake Farrellb0d95602011-12-06 01:17:26 +0000814 event_base_set(ioThread_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000815
816 // Add the event
817 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000818 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000819 }
820}
821
822/**
823 * Closes a connection
824 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000825void TNonblockingServer::TConnection::close() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000826 // Delete the registered libevent
827 if (event_del(&event_) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400828 GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR);
David Reiss105961d2010-10-06 17:10:17 +0000829 }
830
831 if (serverEventHandler_ != NULL) {
832 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000833 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000834 ioThread_ = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000835
836 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000837 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000838
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000839 // close any factory produced transports
840 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000841 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000842
Mark Slee2f6404d2006-10-10 01:37:40 +0000843 // Give this object back to the server that owns it
844 server_->returnConnection(this);
845}
846
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000847void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
848 size_t readLimit,
849 size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000850 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000851 free(readBuffer_);
852 readBuffer_ = NULL;
853 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000854 }
David Reiss54bec5d2010-10-06 17:10:45 +0000855
856 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
857 // just start over
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400858 outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
David Reiss54bec5d2010-10-06 17:10:45 +0000859 largestWriteBufferSize_ = 0;
860 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000861}
862
David Reiss8ede8182010-09-02 15:26:28 +0000863TNonblockingServer::~TNonblockingServer() {
Roger Meier0c04fcc2013-03-22 19:52:08 +0100864 // Close any active connections (moves them to the idle connection stack)
865 while (activeConnections_.size()) {
866 activeConnections_.front()->close();
867 }
David Reiss8ede8182010-09-02 15:26:28 +0000868 // Clean up unused TConnection objects in connectionStack_
869 while (!connectionStack_.empty()) {
870 TConnection* connection = connectionStack_.top();
871 connectionStack_.pop();
872 delete connection;
873 }
Roger Meier0c04fcc2013-03-22 19:52:08 +0100874 // The TNonblockingIOThread objects have shared_ptrs to the Thread
875 // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
876 // objects (as runnable) so these objects will never deallocate without help.
877 while (!ioThreads_.empty()) {
878 boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
879 ioThreads_.pop_back();
880 iot->setThread(boost::shared_ptr<Thread>());
881 }
David Reiss8ede8182010-09-02 15:26:28 +0000882}
883
Mark Slee2f6404d2006-10-10 01:37:40 +0000884/**
885 * Creates a new connection either by reusing an object off the stack or
886 * by allocating a new one entirely
887 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000888TNonblockingServer::TConnection* TNonblockingServer::createConnection(
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400889 THRIFT_SOCKET socket, const sockaddr* addr, socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000890 // Check the stack
Jake Farrellb0d95602011-12-06 01:17:26 +0000891 Guard g(connMutex_);
892
893 // pick an IO thread to handle this connection -- currently round robin
Jake Farrellb0d95602011-12-06 01:17:26 +0000894 assert(nextIOThread_ < ioThreads_.size());
895 int selectedThreadIdx = nextIOThread_;
896 nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
897
898 TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
899
900 // Check the connection stack to see if we can re-use
901 TConnection* result = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000902 if (connectionStack_.empty()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000903 result = new TConnection(socket, ioThread, addr, addrLen);
904 ++numTConnections_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000905 } else {
Jake Farrellb0d95602011-12-06 01:17:26 +0000906 result = connectionStack_.top();
Mark Slee2f6404d2006-10-10 01:37:40 +0000907 connectionStack_.pop();
Jake Farrellb0d95602011-12-06 01:17:26 +0000908 result->init(socket, ioThread, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000909 }
Roger Meier0c04fcc2013-03-22 19:52:08 +0100910 activeConnections_.push_back(result);
Jake Farrellb0d95602011-12-06 01:17:26 +0000911 return result;
Mark Slee2f6404d2006-10-10 01:37:40 +0000912}
913
914/**
915 * Returns a connection to the stack
916 */
917void TNonblockingServer::returnConnection(TConnection* connection) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000918 Guard g(connMutex_);
919
Roger Meier0c04fcc2013-03-22 19:52:08 +0100920 activeConnections_.erase(std::remove(activeConnections_.begin(), activeConnections_.end(), connection), activeConnections_.end());
921
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000922 if (connectionStackLimit_ &&
923 (connectionStack_.size() >= connectionStackLimit_)) {
924 delete connection;
Jake Farrellb0d95602011-12-06 01:17:26 +0000925 --numTConnections_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000926 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000927 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000928 connectionStack_.push(connection);
929 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000930}
931
932/**
David Reissa79e4882008-03-05 07:51:47 +0000933 * Server socket had something happen. We accept all waiting client
934 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000935 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400936void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
Roger Meier3b771a12010-11-17 22:11:26 +0000937 (void) which;
David Reiss3bb5e052010-01-25 19:31:31 +0000938 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000939 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000940
Mark Slee2f6404d2006-10-10 01:37:40 +0000941 // Server socket accepted a new connection
942 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000943 sockaddr_storage addrStorage;
944 sockaddr* addrp = (sockaddr*)&addrStorage;
945 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000946
Mark Slee2f6404d2006-10-10 01:37:40 +0000947 // Going to accept a new client socket
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400948 THRIFT_SOCKET clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000949
Mark Slee2f6404d2006-10-10 01:37:40 +0000950 // Accept as many new clients as possible, even though libevent signaled only
951 // one, this helps us to avoid having to go back into the libevent engine so
952 // many times
David Reiss105961d2010-10-06 17:10:17 +0000953 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000954 // If we're overloaded, take action here
955 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000956 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000957 nConnectionsDropped_++;
958 nTotalConnectionsDropped_++;
959 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400960 ::THRIFT_CLOSESOCKET(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000961 return;
David Reiss01fe1532010-03-09 05:19:25 +0000962 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
963 if (!drainPendingTask()) {
964 // Nothing left to discard, so we drop connection instead.
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400965 ::THRIFT_CLOSESOCKET(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000966 return;
David Reiss01fe1532010-03-09 05:19:25 +0000967 }
968 }
969 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000970
Mark Slee2f6404d2006-10-10 01:37:40 +0000971 // Explicitly set this socket to NONBLOCK mode
972 int flags;
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400973 if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0 ||
974 THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
975 GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ", THRIFT_GET_SOCKET_ERROR);
976 ::THRIFT_CLOSESOCKET(clientSocket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000977 return;
978 }
979
980 // Create a new TConnection for this client socket.
981 TConnection* clientConnection =
Jake Farrellb0d95602011-12-06 01:17:26 +0000982 createConnection(clientSocket, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000983
984 // Fail fast if we could not create a TConnection object
985 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000986 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400987 ::THRIFT_CLOSESOCKET(clientSocket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000988 return;
989 }
990
Jake Farrellb0d95602011-12-06 01:17:26 +0000991 /*
992 * Either notify the ioThread that is assigned this connection to
993 * start processing, or if it is us, we'll just ask this
994 * connection to do its initial state change here.
995 *
996 * (We need to avoid writing to our own notification pipe, to
997 * avoid possible deadlocks if the pipe is full.)
998 *
999 * The IO thread #0 is the only one that handles these listen
1000 * events, so unless the connection has been assigned to thread #0
1001 * we know it's not on our thread.
1002 */
1003 if (clientConnection->getIOThreadNumber() == 0) {
1004 clientConnection->transition();
1005 } else {
1006 clientConnection->notifyIOThread();
1007 }
David Reiss3e7fca42009-09-19 01:59:13 +00001008
1009 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +00001010 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +00001011 }
Mark Slee79b16942007-11-26 19:05:29 +00001012
Jake Farrellb0d95602011-12-06 01:17:26 +00001013
Mark Slee2f6404d2006-10-10 01:37:40 +00001014 // Done looping accept, now we have to make sure the error is due to
1015 // blocking. Any other error is a problem
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001016 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
1017 GlobalOutput.perror("thriftServerEventHandler: accept() ", THRIFT_GET_SOCKET_ERROR);
Mark Slee2f6404d2006-10-10 01:37:40 +00001018 }
1019}
1020
1021/**
Mark Slee79b16942007-11-26 19:05:29 +00001022 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +00001023 */
Jake Farrellb0d95602011-12-06 01:17:26 +00001024void TNonblockingServer::createAndListenOnSocket() {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001025 THRIFT_SOCKET s;
Jake Farrellb0d95602011-12-06 01:17:26 +00001026
Mark Sleefb4b5142007-11-20 01:27:08 +00001027 struct addrinfo hints, *res, *res0;
1028 int error;
Mark Slee79b16942007-11-26 19:05:29 +00001029
Mark Sleefb4b5142007-11-20 01:27:08 +00001030 char port[sizeof("65536") + 1];
1031 memset(&hints, 0, sizeof(hints));
1032 hints.ai_family = PF_UNSPEC;
1033 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +00001034 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +00001035 sprintf(port, "%d", port_);
1036
1037 // Wildcard address
1038 error = getaddrinfo(NULL, port, &hints, &res0);
1039 if (error) {
Roger Meierd8f50f32012-04-11 21:48:56 +00001040 throw TException("TNonblockingServer::serve() getaddrinfo " +
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001041 string(THRIFT_GAI_STRERROR(error)));
Mark Sleefb4b5142007-11-20 01:27:08 +00001042 }
1043
1044 // Pick the ipv6 address first since ipv4 addresses can be mapped
1045 // into ipv6 space.
1046 for (res = res0; res; res = res->ai_next) {
1047 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
1048 break;
1049 }
1050
Mark Slee2f6404d2006-10-10 01:37:40 +00001051 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +00001052 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
1053 if (s == -1) {
1054 freeaddrinfo(res0);
1055 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +00001056 }
1057
David Reiss13aea462008-06-10 22:56:04 +00001058 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +00001059 if (res->ai_family == AF_INET6) {
1060 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +00001061 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +00001062 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
1063 }
David Reiss13aea462008-06-10 22:56:04 +00001064 }
1065 #endif // #ifdef IPV6_V6ONLY
1066
1067
Mark Slee79b16942007-11-26 19:05:29 +00001068 int one = 1;
1069
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001070 // Set THRIFT_NO_SOCKET_CACHING to avoid 2MSL delay on server restart
1071 setsockopt(s, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +00001072
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001073 if (::bind(s, res->ai_addr, static_cast<int>(res->ai_addrlen)) == -1) {
1074 ::THRIFT_CLOSESOCKET(s);
Mark Slee79b16942007-11-26 19:05:29 +00001075 freeaddrinfo(res0);
Roger Meierd8f50f32012-04-11 21:48:56 +00001076 throw TTransportException(TTransportException::NOT_OPEN,
1077 "TNonblockingServer::serve() bind",
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001078 THRIFT_GET_SOCKET_ERROR);
Mark Slee79b16942007-11-26 19:05:29 +00001079 }
1080
1081 // Done with the addr info
1082 freeaddrinfo(res0);
1083
1084 // Set up this file descriptor for listening
1085 listenSocket(s);
1086}
1087
1088/**
1089 * Takes a socket created by listenSocket() and sets various options on it
1090 * to prepare for use in the server.
1091 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001092void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
Mark Slee2f6404d2006-10-10 01:37:40 +00001093 // Set socket to nonblocking mode
1094 int flags;
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001095 if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0 ||
1096 THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
1097 ::THRIFT_CLOSESOCKET(s);
1098 throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +00001099 }
1100
1101 int one = 1;
1102 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +00001103
1104 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +00001105 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001106
1107 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +00001108 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +00001109
1110 // Set TCP nodelay if available, MAC OS X Hack
1111 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
1112 #ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +00001113 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001114 #endif
1115
David Reiss1c20c872010-03-09 05:20:14 +00001116 #ifdef TCP_LOW_MIN_RTO
1117 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +00001118 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +00001119 }
1120 #endif
1121
Mark Slee79b16942007-11-26 19:05:29 +00001122 if (listen(s, LISTEN_BACKLOG) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001123 ::THRIFT_CLOSESOCKET(s);
Mark Slee79b16942007-11-26 19:05:29 +00001124 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +00001125 }
1126
Mark Slee79b16942007-11-26 19:05:29 +00001127 // Cool, this socket is good to go, set it as the serverSocket_
1128 serverSocket_ = s;
1129}
1130
David Reiss068f4162010-03-09 05:19:45 +00001131void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
1132 threadManager_ = threadManager;
1133 if (threadManager != NULL) {
Roger Meier82525772012-11-16 00:38:27 +00001134 threadManager->setExpireCallback(apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose, this, apache::thrift::stdcxx::placeholders::_1));
David Reiss068f4162010-03-09 05:19:45 +00001135 threadPoolProcessing_ = true;
1136 } else {
1137 threadPoolProcessing_ = false;
1138 }
1139}
1140
David Reiss01fe1532010-03-09 05:19:25 +00001141bool TNonblockingServer::serverOverloaded() {
1142 size_t activeConnections = numTConnections_ - connectionStack_.size();
1143 if (numActiveProcessors_ > maxActiveProcessors_ ||
1144 activeConnections > maxConnections_) {
1145 if (!overloaded_) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001146 GlobalOutput.printf("TNonblockingServer: overload condition begun.");
David Reiss01fe1532010-03-09 05:19:25 +00001147 overloaded_ = true;
1148 }
1149 } else {
1150 if (overloaded_ &&
1151 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
1152 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001153 GlobalOutput.printf("TNonblockingServer: overload ended; "
1154 "%u dropped (%llu total)",
David Reiss01fe1532010-03-09 05:19:25 +00001155 nConnectionsDropped_, nTotalConnectionsDropped_);
1156 nConnectionsDropped_ = 0;
1157 overloaded_ = false;
1158 }
1159 }
1160
1161 return overloaded_;
1162}
1163
1164bool TNonblockingServer::drainPendingTask() {
1165 if (threadManager_) {
1166 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1167 if (task) {
1168 TConnection* connection =
1169 static_cast<TConnection::Task*>(task.get())->getTConnection();
1170 assert(connection && connection->getServer()
1171 && connection->getState() == APP_WAIT_TASK);
1172 connection->forceClose();
1173 return true;
1174 }
1175 }
1176 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001177}
1178
David Reiss068f4162010-03-09 05:19:45 +00001179void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
1180 TConnection* connection =
1181 static_cast<TConnection::Task*>(task.get())->getTConnection();
Jake Farrellb0d95602011-12-06 01:17:26 +00001182 assert(connection && connection->getServer() &&
1183 connection->getState() == APP_WAIT_TASK);
David Reiss068f4162010-03-09 05:19:45 +00001184 connection->forceClose();
1185}
1186
Jake Farrellb0d95602011-12-06 01:17:26 +00001187void TNonblockingServer::stop() {
1188 // Breaks the event loop in all threads so that they end ASAP.
Roger Meierd0cdecf2011-12-08 19:34:01 +00001189 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001190 ioThreads_[i]->stop();
1191 }
1192}
1193
Mark Slee79b16942007-11-26 19:05:29 +00001194/**
1195 * Main workhorse function, starts up the server listening on a port and
1196 * loops over the libevent handler.
1197 */
1198void TNonblockingServer::serve() {
Jake Farrellb0d95602011-12-06 01:17:26 +00001199 // init listen socket
1200 createAndListenOnSocket();
Mark Slee79b16942007-11-26 19:05:29 +00001201
Jake Farrellb0d95602011-12-06 01:17:26 +00001202 // set up the IO threads
1203 assert(ioThreads_.empty());
1204 if (!numIOThreads_) {
1205 numIOThreads_ = DEFAULT_IO_THREADS;
David Reiss01fe1532010-03-09 05:19:25 +00001206 }
1207
Roger Meierd0cdecf2011-12-08 19:34:01 +00001208 for (uint32_t id = 0; id < numIOThreads_; ++id) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001209 // the first IO thread also does the listening on server socket
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001210 THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : -1);
Mark Slee2f6404d2006-10-10 01:37:40 +00001211
Jake Farrellb0d95602011-12-06 01:17:26 +00001212 shared_ptr<TNonblockingIOThread> thread(
1213 new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
1214 ioThreads_.push_back(thread);
1215 }
1216
1217 // Notify handler of the preServe event
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001218 if (eventHandler_ != NULL) {
1219 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001220 }
1221
Jake Farrellb0d95602011-12-06 01:17:26 +00001222 // Start all of our helper IO threads. Note that the threads run forever,
1223 // only terminating if stop() is called.
1224 assert(ioThreads_.size() == numIOThreads_);
1225 assert(ioThreads_.size() > 0);
1226
1227 GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
1228 port_, ioThreads_.size());
1229
1230 // Launch all the secondary IO threads in separate threads
1231 if (ioThreads_.size() > 1) {
Roger Meier12d70532011-12-14 23:35:28 +00001232 ioThreadFactory_.reset(new PlatformThreadFactory(
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001233#if !defined(USE_BOOST_THREAD) && !defined(USE_STD_THREAD)
Roger Meier12d70532011-12-14 23:35:28 +00001234 PlatformThreadFactory::OTHER, // scheduler
1235 PlatformThreadFactory::NORMAL, // priority
Jake Farrellb0d95602011-12-06 01:17:26 +00001236 1, // stack size (MB)
Roger Meier12d70532011-12-14 23:35:28 +00001237#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001238 false // detached
1239 ));
1240
1241 assert(ioThreadFactory_.get());
1242
1243 // intentionally starting at thread 1, not 0
Roger Meierd0cdecf2011-12-08 19:34:01 +00001244 for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001245 shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1246 ioThreads_[i]->setThread(thread);
1247 thread->start();
1248 }
1249 }
1250
1251 // Run the primary (listener) IO thread loop in our main thread; this will
1252 // only return when the server is shutting down.
1253 ioThreads_[0]->run();
1254
1255 // Ensure all threads are finished before exiting serve()
Roger Meierd0cdecf2011-12-08 19:34:01 +00001256 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001257 ioThreads_[i]->join();
1258 GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1259 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001260}
1261
Jake Farrellb0d95602011-12-06 01:17:26 +00001262TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1263 int number,
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001264 THRIFT_SOCKET listenSocket,
Jake Farrellb0d95602011-12-06 01:17:26 +00001265 bool useHighPriority)
1266 : server_(server)
1267 , number_(number)
1268 , listenSocket_(listenSocket)
1269 , useHighPriority_(useHighPriority)
1270 , eventBase_(NULL) {
1271 notificationPipeFDs_[0] = -1;
1272 notificationPipeFDs_[1] = -1;
1273}
1274
1275TNonblockingIOThread::~TNonblockingIOThread() {
1276 // make sure our associated thread is fully finished
1277 join();
1278
1279 if (eventBase_) {
1280 event_base_free(eventBase_);
Bryan Duxbury76c43682011-08-24 21:26:48 +00001281 }
1282
Jake Farrellb0d95602011-12-06 01:17:26 +00001283 if (listenSocket_ >= 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001284 if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001285 GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001286 THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001287 }
Roger Meier12d70532011-12-14 23:35:28 +00001288 listenSocket_ = TNonblockingServer::INVALID_SOCKET_VALUE;
Jake Farrellb0d95602011-12-06 01:17:26 +00001289 }
1290
1291 for (int i = 0; i < 2; ++i) {
1292 if (notificationPipeFDs_[i] >= 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001293 if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFDs_[i])) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001294 GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001295 THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001296 }
Roger Meier12d70532011-12-14 23:35:28 +00001297 notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET_VALUE;
Jake Farrellb0d95602011-12-06 01:17:26 +00001298 }
1299 }
1300}
1301
1302void TNonblockingIOThread::createNotificationPipe() {
Roger Meier12d70532011-12-14 23:35:28 +00001303 if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
1304 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
Jake Farrellb0d95602011-12-06 01:17:26 +00001305 throw TException("can't create notification pipe");
1306 }
Roger Meier12d70532011-12-14 23:35:28 +00001307 if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
1308 evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001309 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1310 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
1311 throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
Jake Farrellb0d95602011-12-06 01:17:26 +00001312 }
1313 for (int i = 0; i < 2; ++i) {
Roger Meier12d70532011-12-14 23:35:28 +00001314#if LIBEVENT_VERSION_NUMBER < 0x02000000
1315 int flags;
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001316 if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
1317 THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
Roger Meier12d70532011-12-14 23:35:28 +00001318#else
1319 if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
1320#endif
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001321 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1322 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
Jake Farrellb0d95602011-12-06 01:17:26 +00001323 throw TException("TNonblockingServer::createNotificationPipe() "
1324 "FD_CLOEXEC");
1325 }
1326 }
1327}
1328
1329/**
1330 * Register the core libevent events onto the proper base.
1331 */
1332void TNonblockingIOThread::registerEvents() {
1333 if (listenSocket_ >= 0) {
1334 // Register the server event
1335 event_set(&serverEvent_,
1336 listenSocket_,
1337 EV_READ | EV_PERSIST,
1338 TNonblockingIOThread::listenHandler,
1339 server_);
1340 event_base_set(eventBase_, &serverEvent_);
1341
1342 // Add the event and start up the server
1343 if (-1 == event_add(&serverEvent_, 0)) {
1344 throw TException("TNonblockingServer::serve(): "
1345 "event_add() failed on server listen event");
1346 }
1347 GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
1348 number_);
1349 }
1350
1351 createNotificationPipe();
1352
1353 // Create an event to be notified when a task finishes
1354 event_set(&notificationEvent_,
1355 getNotificationRecvFD(),
1356 EV_READ | EV_PERSIST,
1357 TNonblockingIOThread::notifyHandler,
1358 this);
1359
1360 // Attach to the base
1361 event_base_set(eventBase_, &notificationEvent_);
1362
1363 // Add the event and start up the server
1364 if (-1 == event_add(&notificationEvent_, 0)) {
1365 throw TException("TNonblockingServer::serve(): "
1366 "event_add() failed on task-done notification event");
1367 }
1368 GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
1369 number_);
1370}
1371
1372bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001373 THRIFT_SOCKET fd = getNotificationSendFD();
Jake Farrellb0d95602011-12-06 01:17:26 +00001374 if (fd < 0) {
1375 return false;
1376 }
1377
1378 const int kSize = sizeof(conn);
Roger Meier12d70532011-12-14 23:35:28 +00001379 if (send(fd, const_cast_sockopt(&conn), kSize, 0) != kSize) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001380 return false;
1381 }
1382
1383 return true;
1384}
1385
1386/* static */
Roger Meier12d70532011-12-14 23:35:28 +00001387void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001388 TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
1389 assert(ioThread);
Roger Meierd0cdecf2011-12-08 19:34:01 +00001390 (void)which;
Jake Farrellb0d95602011-12-06 01:17:26 +00001391
1392 while (true) {
1393 TNonblockingServer::TConnection* connection = 0;
1394 const int kSize = sizeof(connection);
Roger Meier12d70532011-12-14 23:35:28 +00001395 int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
Jake Farrellb0d95602011-12-06 01:17:26 +00001396 if (nBytes == kSize) {
1397 if (connection == NULL) {
1398 // this is the command to stop our thread, exit the handler!
1399 return;
1400 }
1401 connection->transition();
1402 } else if (nBytes > 0) {
1403 // throw away these bytes and hope that next time we get a solid read
1404 GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
1405 nBytes, kSize);
1406 ioThread->breakLoop(true);
1407 return;
1408 } else if (nBytes == 0) {
1409 GlobalOutput.printf("notifyHandler: Notify socket closed!");
1410 // exit the loop
1411 break;
1412 } else { // nBytes < 0
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001413 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001414 GlobalOutput.perror(
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001415 "TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001416 ioThread->breakLoop(true);
1417 return;
1418 }
1419 // exit the loop
1420 break;
1421 }
1422 }
1423}
1424
1425void TNonblockingIOThread::breakLoop(bool error) {
1426 if (error) {
1427 GlobalOutput.printf(
1428 "TNonblockingServer: IO thread #%d exiting with error.", number_);
1429 // TODO: figure out something better to do here, but for now kill the
1430 // whole process.
1431 GlobalOutput.printf("TNonblockingServer: aborting process.");
1432 ::abort();
1433 }
1434
1435 // sets a flag so that the loop exits on the next event
Bryan Duxbury76c43682011-08-24 21:26:48 +00001436 event_base_loopbreak(eventBase_);
1437
Jake Farrellb0d95602011-12-06 01:17:26 +00001438 // event_base_loopbreak() only causes the loop to exit the next time
1439 // it wakes up. We need to force it to wake up, in case there are
1440 // no real events it needs to process.
Bryan Duxbury76c43682011-08-24 21:26:48 +00001441 //
Jake Farrellb0d95602011-12-06 01:17:26 +00001442 // If we're running in the same thread, we can't use the notify(0)
1443 // mechanism to stop the thread, but happily if we're running in the
1444 // same thread, this means the thread can't be blocking in the event
1445 // loop either.
Roger Meier12d70532011-12-14 23:35:28 +00001446 if (!Thread::is_current(threadId_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001447 notify(NULL);
1448 }
1449}
1450
1451void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
Roger Meier12d70532011-12-14 23:35:28 +00001452#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +00001453 // Start out with a standard, low-priority setup for the sched params.
1454 struct sched_param sp;
1455 bzero((void*) &sp, sizeof(sp));
1456 int policy = SCHED_OTHER;
1457
1458 // If desired, set up high-priority sched params structure.
1459 if (value) {
1460 // FIFO scheduler, ranked above default SCHED_OTHER queue
1461 policy = SCHED_FIFO;
1462 // The priority only compares us to other SCHED_FIFO threads, so we
1463 // just pick a random priority halfway between min & max.
1464 const int priority = (sched_get_priority_max(policy) +
1465 sched_get_priority_min(policy)) / 2;
1466
1467 sp.sched_priority = priority;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001468 }
1469
Jake Farrellb0d95602011-12-06 01:17:26 +00001470 // Actually set the sched params for the current thread.
1471 if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
1472 GlobalOutput.printf(
1473 "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
1474 } else {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001475 GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001476 }
Roger Meier12d70532011-12-14 23:35:28 +00001477#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001478}
Bryan Duxbury76c43682011-08-24 21:26:48 +00001479
Jake Farrellb0d95602011-12-06 01:17:26 +00001480void TNonblockingIOThread::run() {
Roger Meier12d70532011-12-14 23:35:28 +00001481 threadId_ = Thread::get_current();
Jake Farrellb0d95602011-12-06 01:17:26 +00001482
1483 assert(eventBase_ == 0);
1484 eventBase_ = event_base_new();
1485
1486 // Print some libevent stats
1487 if (number_ == 0) {
1488 GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
1489 event_get_version(),
1490 event_base_get_method(eventBase_));
1491 }
1492
1493
1494 registerEvents();
1495
1496 GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
1497 number_);
1498
1499 if (useHighPriority_) {
1500 setCurrentThreadHighPriority(true);
1501 }
1502
1503 // Run libevent engine, never returns, invokes calls to eventHandler
1504 event_base_loop(eventBase_, 0);
1505
1506 if (useHighPriority_) {
1507 setCurrentThreadHighPriority(false);
1508 }
1509
1510 // cleans up our registered events
1511 cleanupEvents();
1512
1513 GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
1514 number_);
1515}
1516
1517void TNonblockingIOThread::cleanupEvents() {
1518 // stop the listen socket, if any
1519 if (listenSocket_ >= 0) {
1520 if (event_del(&serverEvent_) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001521 GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001522 }
1523 }
1524
1525 event_del(&notificationEvent_);
1526}
1527
1528
1529void TNonblockingIOThread::stop() {
1530 // This should cause the thread to fall out of its event loop ASAP.
1531 breakLoop(false);
1532}
1533
1534void TNonblockingIOThread::join() {
1535 // If this was a thread created by a factory (not the thread that called
1536 // serve()), we join() it to make sure we shut down fully.
1537 if (thread_) {
1538 try {
1539 // Note that it is safe to both join() ourselves twice, as well as join
1540 // the current thread as the pthread implementation checks for deadlock.
1541 thread_->join();
1542 } catch(...) {
1543 // swallow everything
1544 }
1545 }
Bryan Duxbury76c43682011-08-24 21:26:48 +00001546}
1547
T Jake Lucianib5e62212009-01-31 22:36:20 +00001548}}} // apache::thrift::server