blob: 7d42a2eebf49e88b324115690520bb7534074880 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meier3781c242011-12-11 20:07:21 +000020#define __STDC_FORMAT_MACROS
21
Roger Meier2fa9c312011-09-05 19:15:53 +000022#ifdef HAVE_CONFIG_H
23#include <config.h>
24#endif
25
Mark Slee2f6404d2006-10-10 01:37:40 +000026#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000027#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000028#include <transport/TSocket.h>
Roger Meier12d70532011-12-14 23:35:28 +000029#include <concurrency/PlatformThreadFactory.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000030
Mark Sleee02385b2007-06-09 01:21:16 +000031#include <iostream>
Roger Meier30aae0c2011-07-08 12:23:31 +000032
33#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000034#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000035#endif
36
37#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000038#include <netinet/in.h>
39#include <netinet/tcp.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000040#endif
41
42#ifdef HAVE_ARPA_INET_H
Bryan Duxbury76c43682011-08-24 21:26:48 +000043#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000044#endif
45
46#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000047#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000048#endif
49
Roger Meier2fa9c312011-09-05 19:15:53 +000050#ifdef HAVE_FCNTL_H
Mark Slee2f6404d2006-10-10 01:37:40 +000051#include <fcntl.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000052#endif
53
Mark Slee2f6404d2006-10-10 01:37:40 +000054#include <errno.h>
55#include <assert.h>
Roger Meier12d70532011-12-14 23:35:28 +000056
57#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +000058#include <sched.h>
Roger Meier12d70532011-12-14 23:35:28 +000059#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000060
David Reiss9b903442009-10-21 05:51:28 +000061#ifndef AF_LOCAL
62#define AF_LOCAL AF_UNIX
63#endif
64
Roger Meier12d70532011-12-14 23:35:28 +000065#ifdef _MSC_VER
66#define PRIu32 "I32u"
67#endif
68
T Jake Lucianib5e62212009-01-31 22:36:20 +000069namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000070
T Jake Lucianib5e62212009-01-31 22:36:20 +000071using namespace apache::thrift::protocol;
72using namespace apache::thrift::transport;
73using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000074using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000075using apache::thrift::transport::TSocket;
76using apache::thrift::transport::TTransportException;
Jake Farrellb0d95602011-12-06 01:17:26 +000077using boost::shared_ptr;
Mark Sleee02385b2007-06-09 01:21:16 +000078
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000079/// Three states for sockets: recv frame size, recv data, and send mode
80enum TSocketState {
81 SOCKET_RECV_FRAMING,
82 SOCKET_RECV,
83 SOCKET_SEND
84};
85
86/**
87 * Five states for the nonblocking server:
88 * 1) initialize
89 * 2) read 4 byte frame size
90 * 3) read frame of data
91 * 4) send back data (if any)
92 * 5) force immediate connection close
93 */
94enum TAppState {
95 APP_INIT,
96 APP_READ_FRAME_SIZE,
97 APP_READ_REQUEST,
98 APP_WAIT_TASK,
99 APP_SEND_RESULT,
100 APP_CLOSE_CONNECTION
101};
102
103/**
104 * Represents a connection that is handled via libevent. This connection
105 * essentially encapsulates a socket that has some associated libevent state.
106 */
107class TNonblockingServer::TConnection {
108 private:
Jake Farrellb0d95602011-12-06 01:17:26 +0000109 /// Server IO Thread handling this connection
110 TNonblockingIOThread* ioThread_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000111
112 /// Server handle
113 TNonblockingServer* server_;
114
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000115 /// TProcessor
116 boost::shared_ptr<TProcessor> processor_;
117
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000118 /// Object wrapping network socket
119 boost::shared_ptr<TSocket> tSocket_;
120
121 /// Libevent object
122 struct event event_;
123
124 /// Libevent flags
125 short eventFlags_;
126
127 /// Socket mode
128 TSocketState socketState_;
129
130 /// Application state
131 TAppState appState_;
132
133 /// How much data needed to read
134 uint32_t readWant_;
135
136 /// Where in the read buffer are we
137 uint32_t readBufferPos_;
138
139 /// Read buffer
140 uint8_t* readBuffer_;
141
142 /// Read buffer size
143 uint32_t readBufferSize_;
144
145 /// Write buffer
146 uint8_t* writeBuffer_;
147
148 /// Write buffer size
149 uint32_t writeBufferSize_;
150
151 /// How far through writing are we?
152 uint32_t writeBufferPos_;
153
154 /// Largest size of write buffer seen since buffer was constructed
155 size_t largestWriteBufferSize_;
156
157 /// Count of the number of calls for use with getResizeBufferEveryN().
158 int32_t callsForResize_;
159
160 /// Task handle
161 int taskHandle_;
162
163 /// Task event
164 struct event taskEvent_;
165
166 /// Transport to read from
167 boost::shared_ptr<TMemoryBuffer> inputTransport_;
168
169 /// Transport that processor writes to
170 boost::shared_ptr<TMemoryBuffer> outputTransport_;
171
172 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
173 boost::shared_ptr<TTransport> factoryInputTransport_;
174 boost::shared_ptr<TTransport> factoryOutputTransport_;
175
176 /// Protocol decoder
177 boost::shared_ptr<TProtocol> inputProtocol_;
178
179 /// Protocol encoder
180 boost::shared_ptr<TProtocol> outputProtocol_;
181
182 /// Server event handler, if any
183 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
184
185 /// Thrift call context, if any
186 void *connectionContext_;
187
188 /// Go into read mode
189 void setRead() {
190 setFlags(EV_READ | EV_PERSIST);
191 }
192
193 /// Go into write mode
194 void setWrite() {
195 setFlags(EV_WRITE | EV_PERSIST);
196 }
197
198 /// Set socket idle
199 void setIdle() {
200 setFlags(0);
201 }
202
203 /**
204 * Set event flags for this connection.
205 *
206 * @param eventFlags flags we pass to libevent for the connection.
207 */
208 void setFlags(short eventFlags);
209
210 /**
211 * Libevent handler called (via our static wrapper) when the connection
212 * socket had something happen. Rather than use the flags libevent passed,
213 * we use the connection state to determine whether we need to read or
214 * write the socket.
215 */
216 void workSocket();
217
218 /// Close this connection and free or reset its resources.
219 void close();
220
221 public:
222
223 class Task;
224
225 /// Constructor
Jake Farrellb0d95602011-12-06 01:17:26 +0000226 TConnection(int socket, TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000227 const sockaddr* addr, socklen_t addrLen) {
228 readBuffer_ = NULL;
229 readBufferSize_ = 0;
230
Jake Farrellb0d95602011-12-06 01:17:26 +0000231 ioThread_ = ioThread;
232 server_ = ioThread->getServer();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000233
Jake Farrellb0d95602011-12-06 01:17:26 +0000234 // Allocate input and output transports these only need to be allocated
235 // once per TConnection (they don't need to be reallocated on init() call)
236 inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
237 outputTransport_.reset(new TMemoryBuffer(
238 server_->getWriteBufferDefaultSize()));
239 tSocket_.reset(new TSocket());
240 init(socket, ioThread, addr, addrLen);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000241 }
242
243 ~TConnection() {
244 std::free(readBuffer_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000245 }
246
247 /**
248 * Check buffers against any size limits and shrink it if exceeded.
249 *
250 * @param readLimit we reduce read buffer size to this (if nonzero).
251 * @param writeLimit if nonzero and write buffer is larger, replace it.
252 */
253 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
254
255 /// Initialize
Jake Farrellb0d95602011-12-06 01:17:26 +0000256 void init(int socket, TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000257 const sockaddr* addr, socklen_t addrLen);
258
259 /**
260 * This is called when the application transitions from one state into
261 * another. This means that it has finished writing the data that it needed
262 * to, or finished receiving the data that it needed to.
263 */
264 void transition();
265
266 /**
267 * C-callable event handler for connection events. Provides a callback
268 * that libevent can understand which invokes connection_->workSocket().
269 *
270 * @param fd the descriptor the event occurred on.
271 * @param which the flags associated with the event.
272 * @param v void* callback arg where we placed TConnection's "this".
273 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000274 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000275 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
276 ((TConnection*)v)->workSocket();
277 }
278
279 /**
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000280 * Notification to server that processing has ended on this request.
281 * Can be called either when processing is completed or when a waiting
282 * task has been preemptively terminated (on overload).
283 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000284 * Don't call this from the IO thread itself.
285 *
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000286 * @return true if successful, false if unable to notify (check errno).
287 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000288 bool notifyIOThread() {
289 return ioThread_->notify(this);
290 }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000291
Jake Farrellb0d95602011-12-06 01:17:26 +0000292 /*
293 * Returns the number of this connection's currently assigned IO
294 * thread.
295 */
296 int getIOThreadNumber() const {
297 return ioThread_->getThreadNumber();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000298 }
299
300 /// Force connection shutdown for this connection.
301 void forceClose() {
302 appState_ = APP_CLOSE_CONNECTION;
Jake Farrellb0d95602011-12-06 01:17:26 +0000303 if (!notifyIOThread()) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000304 throw TException("TConnection::forceClose: failed write on notify pipe");
305 }
306 }
307
308 /// return the server this connection was initialized for.
Jake Farrellb0d95602011-12-06 01:17:26 +0000309 TNonblockingServer* getServer() const {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000310 return server_;
311 }
312
313 /// get state of connection.
Jake Farrellb0d95602011-12-06 01:17:26 +0000314 TAppState getState() const {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000315 return appState_;
316 }
317
318 /// return the TSocket transport wrapping this network connection
319 boost::shared_ptr<TSocket> getTSocket() const {
320 return tSocket_;
321 }
322
323 /// return the server event handler if any
324 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
325 return serverEventHandler_;
326 }
327
328 /// return the Thrift connection context if any
329 void* getConnectionContext() {
330 return connectionContext_;
331 }
332
333};
334
335class TNonblockingServer::TConnection::Task: public Runnable {
Mark Sleee02385b2007-06-09 01:21:16 +0000336 public:
337 Task(boost::shared_ptr<TProcessor> processor,
338 boost::shared_ptr<TProtocol> input,
339 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +0000340 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +0000341 processor_(processor),
342 input_(input),
343 output_(output),
David Reiss105961d2010-10-06 17:10:17 +0000344 connection_(connection),
345 serverEventHandler_(connection_->getServerEventHandler()),
346 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000347
348 void run() {
349 try {
David Reiss105961d2010-10-06 17:10:17 +0000350 for (;;) {
351 if (serverEventHandler_ != NULL) {
352 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
353 }
354 if (!processor_->process(input_, output_, connectionContext_) ||
355 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000356 break;
357 }
358 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000359 } catch (const TTransportException& ttx) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000360 GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
Bryan Duxbury1e987582011-08-25 17:33:03 +0000361 } catch (const bad_alloc&) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000362 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
David Reiss28e88ec2010-03-09 05:19:27 +0000363 exit(-1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000364 } catch (const std::exception& x) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000365 GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
Bryan Duxbury1e987582011-08-25 17:33:03 +0000366 typeid(x).name(), x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000367 } catch (...) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000368 GlobalOutput.printf(
369 "TNonblockingServer: unknown exception while processing.");
Mark Sleee02385b2007-06-09 01:21:16 +0000370 }
Mark Slee79b16942007-11-26 19:05:29 +0000371
David Reiss01fe1532010-03-09 05:19:25 +0000372 // Signal completion back to the libevent thread via a pipe
Jake Farrellb0d95602011-12-06 01:17:26 +0000373 if (!connection_->notifyIOThread()) {
David Reiss01fe1532010-03-09 05:19:25 +0000374 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000375 }
David Reiss01fe1532010-03-09 05:19:25 +0000376 }
377
378 TConnection* getTConnection() {
379 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +0000380 }
381
382 private:
383 boost::shared_ptr<TProcessor> processor_;
384 boost::shared_ptr<TProtocol> input_;
385 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000386 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000387 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
388 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000389};
Mark Slee5ea15f92007-03-05 22:55:59 +0000390
Jake Farrellb0d95602011-12-06 01:17:26 +0000391void TNonblockingServer::TConnection::init(int socket,
392 TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000393 const sockaddr* addr,
394 socklen_t addrLen) {
David Reiss105961d2010-10-06 17:10:17 +0000395 tSocket_->setSocketFD(socket);
396 tSocket_->setCachedAddress(addr, addrLen);
397
Jake Farrellb0d95602011-12-06 01:17:26 +0000398 ioThread_ = ioThread;
399 server_ = ioThread->getServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000400 appState_ = APP_INIT;
401 eventFlags_ = 0;
402
403 readBufferPos_ = 0;
404 readWant_ = 0;
405
406 writeBuffer_ = NULL;
407 writeBufferSize_ = 0;
408 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000409 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000410
David Reiss89a12942010-10-06 17:10:52 +0000411 socketState_ = SOCKET_RECV_FRAMING;
David Reiss54bec5d2010-10-06 17:10:45 +0000412 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000413
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000414 // get input/transports
Jake Farrellb0d95602011-12-06 01:17:26 +0000415 factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
416 inputTransport_);
417 factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
418 outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000419
420 // Create protocol
Jake Farrellb0d95602011-12-06 01:17:26 +0000421 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
422 factoryInputTransport_);
423 outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
424 factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000425
426 // Set up for any server event handler
427 serverEventHandler_ = server_->getEventHandler();
428 if (serverEventHandler_ != NULL) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000429 connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
430 outputProtocol_);
David Reiss105961d2010-10-06 17:10:17 +0000431 } else {
432 connectionContext_ = NULL;
433 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000434
435 // Get the processor
Jake Farrellb0d95602011-12-06 01:17:26 +0000436 processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000437}
438
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000439void TNonblockingServer::TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000440 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000441 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000442
443 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000444 case SOCKET_RECV_FRAMING:
445 union {
446 uint8_t buf[sizeof(uint32_t)];
Roger Meier3781c242011-12-11 20:07:21 +0000447 uint32_t size;
David Reiss89a12942010-10-06 17:10:52 +0000448 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000449
David Reiss89a12942010-10-06 17:10:52 +0000450 // if we've already received some bytes we kept them here
451 framing.size = readWant_;
452 // determine size of this frame
453 try {
454 // Read from the socket
455 fetch = tSocket_->read(&framing.buf[readBufferPos_],
456 uint32_t(sizeof(framing.size) - readBufferPos_));
457 if (fetch == 0) {
458 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000459 close();
460 return;
461 }
David Reiss89a12942010-10-06 17:10:52 +0000462 readBufferPos_ += fetch;
463 } catch (TTransportException& te) {
464 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
465 close();
466
467 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000468 }
469
David Reiss89a12942010-10-06 17:10:52 +0000470 if (readBufferPos_ < sizeof(framing.size)) {
471 // more needed before frame size is known -- save what we have so far
472 readWant_ = framing.size;
473 return;
474 }
475
476 readWant_ = ntohl(framing.size);
Roger Meier3781c242011-12-11 20:07:21 +0000477 if (readWant_ > server_->getMaxFrameSize()) {
478 // Don't allow giant frame sizes. This prevents bad clients from
479 // causing us to try and allocate a giant buffer.
480 GlobalOutput.printf("TNonblockingServer: frame size too large "
481 "(%"PRIu32" > %zu) from client %s. remote side not "
482 "using TFramedTransport?",
483 readWant_, server_->getMaxFrameSize(),
484 tSocket_->getSocketInfo().c_str());
David Reiss89a12942010-10-06 17:10:52 +0000485 close();
486 return;
487 }
488 // size known; now get the rest of the frame
489 transition();
490 return;
491
492 case SOCKET_RECV:
493 // It is an error to be in this state if we already have all the data
494 assert(readBufferPos_ < readWant_);
495
David Reiss105961d2010-10-06 17:10:17 +0000496 try {
497 // Read from the socket
498 fetch = readWant_ - readBufferPos_;
499 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
500 }
501 catch (TTransportException& te) {
502 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
503 close();
Mark Slee79b16942007-11-26 19:05:29 +0000504
David Reiss105961d2010-10-06 17:10:17 +0000505 return;
506 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000507
Mark Slee2f6404d2006-10-10 01:37:40 +0000508 if (got > 0) {
509 // Move along in the buffer
510 readBufferPos_ += got;
511
512 // Check that we did not overdo it
513 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000514
Mark Slee2f6404d2006-10-10 01:37:40 +0000515 // We are done reading, move onto the next state
516 if (readBufferPos_ == readWant_) {
517 transition();
518 }
519 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000520 }
521
522 // Whenever we get down here it means a remote disconnect
523 close();
Mark Slee79b16942007-11-26 19:05:29 +0000524
Mark Slee2f6404d2006-10-10 01:37:40 +0000525 return;
526
527 case SOCKET_SEND:
528 // Should never have position past size
529 assert(writeBufferPos_ <= writeBufferSize_);
530
531 // If there is no data to send, then let us move on
532 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000533 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000534 transition();
535 return;
536 }
537
David Reiss105961d2010-10-06 17:10:17 +0000538 try {
539 left = writeBufferSize_ - writeBufferPos_;
540 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
541 }
542 catch (TTransportException& te) {
543 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000544 close();
545 return;
546 }
547
548 writeBufferPos_ += sent;
549
550 // Did we overdo it?
551 assert(writeBufferPos_ <= writeBufferSize_);
552
Mark Slee79b16942007-11-26 19:05:29 +0000553 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000554 if (writeBufferPos_ == writeBufferSize_) {
555 transition();
556 }
557
558 return;
559
560 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000561 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000562 assert(0);
563 }
564}
565
566/**
567 * This is called when the application transitions from one state into
568 * another. This means that it has finished writing the data that it needed
569 * to, or finished receiving the data that it needed to.
570 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000571void TNonblockingServer::TConnection::transition() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000572 // ensure this connection is active right now
573 assert(ioThread_);
574 assert(server_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000575
Mark Slee2f6404d2006-10-10 01:37:40 +0000576 // Switch upon the state that we are currently in and move to a new state
577 switch (appState_) {
578
579 case APP_READ_REQUEST:
580 // We are done reading the request, package the read buffer into transport
581 // and get back some data from the dispatch function
582 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000583 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000584 // Prepend four bytes of blank space to the buffer so we can
585 // write the frame size there later.
586 outputTransport_->getWritePtr(4);
587 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000588
David Reiss01fe1532010-03-09 05:19:25 +0000589 server_->incrementActiveProcessors();
590
Mark Sleee02385b2007-06-09 01:21:16 +0000591 if (server_->isThreadPoolProcessing()) {
592 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000593
David Reiss01fe1532010-03-09 05:19:25 +0000594 // Create task and dispatch to the thread manager
595 boost::shared_ptr<Runnable> task =
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000596 boost::shared_ptr<Runnable>(new Task(processor_,
David Reiss01fe1532010-03-09 05:19:25 +0000597 inputProtocol_,
598 outputProtocol_,
599 this));
600 // The application is now waiting on the task to finish
601 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000602
David Reisse11f3072008-10-07 21:39:19 +0000603 try {
604 server_->addTask(task);
605 } catch (IllegalStateException & ise) {
606 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000607 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000608 close();
609 }
Mark Slee402ee282007-08-23 01:43:20 +0000610
David Reiss01fe1532010-03-09 05:19:25 +0000611 // Set this connection idle so that libevent doesn't process more
612 // data on it while we're still waiting for the threadmanager to
613 // finish this task
614 setIdle();
615 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000616 } else {
617 try {
618 // Invoke the processor
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000619 processor_->process(inputProtocol_, outputProtocol_,
620 connectionContext_);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000621 } catch (const TTransportException &ttx) {
622 GlobalOutput.printf("TNonblockingServer transport error in "
623 "process(): %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000624 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000625 close();
626 return;
Bryan Duxbury1e987582011-08-25 17:33:03 +0000627 } catch (const std::exception &x) {
628 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
629 typeid(x).name(), x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000630 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000631 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000632 return;
633 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000634 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000635 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000636 close();
637 return;
638 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000639 }
640
Mark Slee402ee282007-08-23 01:43:20 +0000641 // Intentionally fall through here, the call to process has written into
642 // the writeBuffer_
643
Mark Sleee02385b2007-06-09 01:21:16 +0000644 case APP_WAIT_TASK:
645 // We have now finished processing a task and the result has been written
646 // into the outputTransport_, so we grab its contents and place them into
647 // the writeBuffer_ for actual writing by the libevent thread
648
David Reiss01fe1532010-03-09 05:19:25 +0000649 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000650 // Get the result of the operation
651 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
652
653 // If the function call generated return data, then move into the send
654 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000655 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000656 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000657
658 // Move into write state
659 writeBufferPos_ = 0;
660 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000661
David Reissaf787782008-07-03 20:29:34 +0000662 // Put the frame size into the write buffer
663 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
664 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000665
666 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000667 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000668 setWrite();
669
670 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000671 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000672
673 return;
674 }
675
David Reissc51986f2009-03-24 20:01:25 +0000676 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000677 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000678 goto LABEL_APP_INIT;
679
Mark Slee2f6404d2006-10-10 01:37:40 +0000680 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000681 // it's now safe to perform buffer size housekeeping.
682 if (writeBufferSize_ > largestWriteBufferSize_) {
683 largestWriteBufferSize_ = writeBufferSize_;
684 }
685 if (server_->getResizeBufferEveryN() > 0
686 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
687 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
688 server_->getIdleWriteBufferLimit());
689 callsForResize_ = 0;
690 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000691
692 // N.B.: We also intentionally fall through here into the INIT state!
693
Mark Slee92f00fb2006-10-25 01:28:17 +0000694 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000695 case APP_INIT:
696
697 // Clear write buffer variables
698 writeBuffer_ = NULL;
699 writeBufferPos_ = 0;
700 writeBufferSize_ = 0;
701
Mark Slee2f6404d2006-10-10 01:37:40 +0000702 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000703 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000704 appState_ = APP_READ_FRAME_SIZE;
705
David Reiss89a12942010-10-06 17:10:52 +0000706 readBufferPos_ = 0;
707
Mark Slee2f6404d2006-10-10 01:37:40 +0000708 // Register read event
709 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000710
Mark Slee2f6404d2006-10-10 01:37:40 +0000711 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000712 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000713
714 return;
715
716 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000717 // We just read the request length
718 // Double the buffer size until it is big enough
719 if (readWant_ > readBufferSize_) {
720 if (readBufferSize_ == 0) {
721 readBufferSize_ = 1;
722 }
723 uint32_t newSize = readBufferSize_;
724 while (readWant_ > newSize) {
725 newSize *= 2;
726 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000727
David Reiss89a12942010-10-06 17:10:52 +0000728 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
729 if (newBuffer == NULL) {
730 // nothing else to be done...
731 throw std::bad_alloc();
732 }
733 readBuffer_ = newBuffer;
734 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000735 }
736
Mark Slee2f6404d2006-10-10 01:37:40 +0000737 readBufferPos_= 0;
738
739 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000740 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000741 appState_ = APP_READ_REQUEST;
742
743 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000744 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000745
746 return;
747
David Reiss01fe1532010-03-09 05:19:25 +0000748 case APP_CLOSE_CONNECTION:
749 server_->decrementActiveProcessors();
750 close();
751 return;
752
Mark Slee2f6404d2006-10-10 01:37:40 +0000753 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000754 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000755 assert(0);
756 }
757}
758
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000759void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000760 // Catch the do nothing case
761 if (eventFlags_ == eventFlags) {
762 return;
763 }
764
765 // Delete a previously existing event
766 if (eventFlags_ != 0) {
767 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000768 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000769 return;
770 }
771 }
772
773 // Update in memory structure
774 eventFlags_ = eventFlags;
775
Mark Slee402ee282007-08-23 01:43:20 +0000776 // Do not call event_set if there are no flags
777 if (!eventFlags_) {
778 return;
779 }
780
David Reiss01fe1532010-03-09 05:19:25 +0000781 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000782 * event_set:
783 *
784 * Prepares the event structure &event to be used in future calls to
785 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000786 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000787 *
788 * The events can be either EV_READ, EV_WRITE, or both, indicating
789 * that an application can read or write from the file respectively without
790 * blocking.
791 *
Mark Sleee02385b2007-06-09 01:21:16 +0000792 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000793 * the event and the type of event which will be one of: EV_TIMEOUT,
794 * EV_SIGNAL, EV_READ, EV_WRITE.
795 *
796 * The additional flag EV_PERSIST makes an event_add() persistent until
797 * event_del() has been called.
798 *
799 * Once initialized, the &event struct can be used repeatedly with
800 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000801 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000802 * when an ev structure has been added to libevent using event_add() the
803 * structure must persist until the event occurs (assuming EV_PERSIST
804 * is not set) or is removed using event_del(). You may not reuse the same
805 * ev structure for multiple monitored descriptors; each descriptor needs
806 * its own ev.
807 */
David Reiss105961d2010-10-06 17:10:17 +0000808 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
809 TConnection::eventHandler, this);
Jake Farrellb0d95602011-12-06 01:17:26 +0000810 event_base_set(ioThread_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000811
812 // Add the event
813 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000814 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000815 }
816}
817
818/**
819 * Closes a connection
820 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000821void TNonblockingServer::TConnection::close() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000822 // Delete the registered libevent
823 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000824 GlobalOutput.perror("TConnection::close() event_del", errno);
825 }
826
827 if (serverEventHandler_ != NULL) {
828 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000829 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000830 ioThread_ = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000831
832 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000833 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000834
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000835 // close any factory produced transports
836 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000837 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000838
Mark Slee2f6404d2006-10-10 01:37:40 +0000839 // Give this object back to the server that owns it
840 server_->returnConnection(this);
841}
842
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000843void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
844 size_t readLimit,
845 size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000846 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000847 free(readBuffer_);
848 readBuffer_ = NULL;
849 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000850 }
David Reiss54bec5d2010-10-06 17:10:45 +0000851
852 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
853 // just start over
David Reiss89a12942010-10-06 17:10:52 +0000854 outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
David Reiss54bec5d2010-10-06 17:10:45 +0000855 largestWriteBufferSize_ = 0;
856 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000857}
858
David Reiss8ede8182010-09-02 15:26:28 +0000859TNonblockingServer::~TNonblockingServer() {
860 // TODO: We currently leak any active TConnection objects.
861 // Since we're shutting down and destroying the event_base, the TConnection
862 // objects will never receive any additional callbacks. (And even if they
863 // did, it would be bad, since they keep a pointer around to the server,
864 // which is being destroyed.)
865
866 // Clean up unused TConnection objects in connectionStack_
867 while (!connectionStack_.empty()) {
868 TConnection* connection = connectionStack_.top();
869 connectionStack_.pop();
870 delete connection;
871 }
David Reiss8ede8182010-09-02 15:26:28 +0000872}
873
Mark Slee2f6404d2006-10-10 01:37:40 +0000874/**
875 * Creates a new connection either by reusing an object off the stack or
876 * by allocating a new one entirely
877 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000878TNonblockingServer::TConnection* TNonblockingServer::createConnection(
Jake Farrellb0d95602011-12-06 01:17:26 +0000879 int socket, const sockaddr* addr, socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000880 // Check the stack
Jake Farrellb0d95602011-12-06 01:17:26 +0000881 Guard g(connMutex_);
882
883 // pick an IO thread to handle this connection -- currently round robin
Jake Farrellb0d95602011-12-06 01:17:26 +0000884 assert(nextIOThread_ < ioThreads_.size());
885 int selectedThreadIdx = nextIOThread_;
886 nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
887
888 TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
889
890 // Check the connection stack to see if we can re-use
891 TConnection* result = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000892 if (connectionStack_.empty()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000893 result = new TConnection(socket, ioThread, addr, addrLen);
894 ++numTConnections_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000895 } else {
Jake Farrellb0d95602011-12-06 01:17:26 +0000896 result = connectionStack_.top();
Mark Slee2f6404d2006-10-10 01:37:40 +0000897 connectionStack_.pop();
Jake Farrellb0d95602011-12-06 01:17:26 +0000898 result->init(socket, ioThread, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000899 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000900 return result;
Mark Slee2f6404d2006-10-10 01:37:40 +0000901}
902
903/**
904 * Returns a connection to the stack
905 */
906void TNonblockingServer::returnConnection(TConnection* connection) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000907 Guard g(connMutex_);
908
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000909 if (connectionStackLimit_ &&
910 (connectionStack_.size() >= connectionStackLimit_)) {
911 delete connection;
Jake Farrellb0d95602011-12-06 01:17:26 +0000912 --numTConnections_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000913 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000914 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000915 connectionStack_.push(connection);
916 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000917}
918
919/**
David Reissa79e4882008-03-05 07:51:47 +0000920 * Server socket had something happen. We accept all waiting client
921 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000922 */
923void TNonblockingServer::handleEvent(int fd, short which) {
Roger Meier3b771a12010-11-17 22:11:26 +0000924 (void) which;
David Reiss3bb5e052010-01-25 19:31:31 +0000925 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000926 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000927
Mark Slee2f6404d2006-10-10 01:37:40 +0000928 // Server socket accepted a new connection
929 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000930 sockaddr_storage addrStorage;
931 sockaddr* addrp = (sockaddr*)&addrStorage;
932 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000933
Mark Slee2f6404d2006-10-10 01:37:40 +0000934 // Going to accept a new client socket
935 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000936
Mark Slee2f6404d2006-10-10 01:37:40 +0000937 // Accept as many new clients as possible, even though libevent signaled only
938 // one, this helps us to avoid having to go back into the libevent engine so
939 // many times
David Reiss105961d2010-10-06 17:10:17 +0000940 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000941 // If we're overloaded, take action here
942 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000943 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000944 nConnectionsDropped_++;
945 nTotalConnectionsDropped_++;
946 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
947 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000948 return;
David Reiss01fe1532010-03-09 05:19:25 +0000949 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
950 if (!drainPendingTask()) {
951 // Nothing left to discard, so we drop connection instead.
952 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000953 return;
David Reiss01fe1532010-03-09 05:19:25 +0000954 }
955 }
956 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000957
Mark Slee2f6404d2006-10-10 01:37:40 +0000958 // Explicitly set this socket to NONBLOCK mode
959 int flags;
960 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
961 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000962 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000963 close(clientSocket);
964 return;
965 }
966
967 // Create a new TConnection for this client socket.
968 TConnection* clientConnection =
Jake Farrellb0d95602011-12-06 01:17:26 +0000969 createConnection(clientSocket, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000970
971 // Fail fast if we could not create a TConnection object
972 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000973 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000974 close(clientSocket);
975 return;
976 }
977
Jake Farrellb0d95602011-12-06 01:17:26 +0000978 /*
979 * Either notify the ioThread that is assigned this connection to
980 * start processing, or if it is us, we'll just ask this
981 * connection to do its initial state change here.
982 *
983 * (We need to avoid writing to our own notification pipe, to
984 * avoid possible deadlocks if the pipe is full.)
985 *
986 * The IO thread #0 is the only one that handles these listen
987 * events, so unless the connection has been assigned to thread #0
988 * we know it's not on our thread.
989 */
990 if (clientConnection->getIOThreadNumber() == 0) {
991 clientConnection->transition();
992 } else {
993 clientConnection->notifyIOThread();
994 }
David Reiss3e7fca42009-09-19 01:59:13 +0000995
996 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000997 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000998 }
Mark Slee79b16942007-11-26 19:05:29 +0000999
Jake Farrellb0d95602011-12-06 01:17:26 +00001000
Mark Slee2f6404d2006-10-10 01:37:40 +00001001 // Done looping accept, now we have to make sure the error is due to
1002 // blocking. Any other error is a problem
1003 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +00001004 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +00001005 }
1006}
1007
1008/**
Mark Slee79b16942007-11-26 19:05:29 +00001009 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +00001010 */
Jake Farrellb0d95602011-12-06 01:17:26 +00001011void TNonblockingServer::createAndListenOnSocket() {
Mark Slee79b16942007-11-26 19:05:29 +00001012 int s;
Jake Farrellb0d95602011-12-06 01:17:26 +00001013
Mark Sleefb4b5142007-11-20 01:27:08 +00001014 struct addrinfo hints, *res, *res0;
1015 int error;
Mark Slee79b16942007-11-26 19:05:29 +00001016
Mark Sleefb4b5142007-11-20 01:27:08 +00001017 char port[sizeof("65536") + 1];
1018 memset(&hints, 0, sizeof(hints));
1019 hints.ai_family = PF_UNSPEC;
1020 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +00001021 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +00001022 sprintf(port, "%d", port_);
1023
1024 // Wildcard address
1025 error = getaddrinfo(NULL, port, &hints, &res0);
1026 if (error) {
David Reiss9b209552008-04-08 06:26:05 +00001027 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
1028 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +00001029 return;
1030 }
1031
1032 // Pick the ipv6 address first since ipv4 addresses can be mapped
1033 // into ipv6 space.
1034 for (res = res0; res; res = res->ai_next) {
1035 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
1036 break;
1037 }
1038
Mark Slee2f6404d2006-10-10 01:37:40 +00001039 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +00001040 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
1041 if (s == -1) {
1042 freeaddrinfo(res0);
1043 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +00001044 }
1045
David Reiss13aea462008-06-10 22:56:04 +00001046 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +00001047 if (res->ai_family == AF_INET6) {
1048 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +00001049 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +00001050 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
1051 }
David Reiss13aea462008-06-10 22:56:04 +00001052 }
1053 #endif // #ifdef IPV6_V6ONLY
1054
1055
Mark Slee79b16942007-11-26 19:05:29 +00001056 int one = 1;
1057
1058 // Set reuseaddr to avoid 2MSL delay on server restart
Roger Meier30aae0c2011-07-08 12:23:31 +00001059 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +00001060
Roger Meier30aae0c2011-07-08 12:23:31 +00001061 if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
Mark Slee79b16942007-11-26 19:05:29 +00001062 close(s);
1063 freeaddrinfo(res0);
1064 throw TException("TNonblockingServer::serve() bind");
1065 }
1066
1067 // Done with the addr info
1068 freeaddrinfo(res0);
1069
1070 // Set up this file descriptor for listening
1071 listenSocket(s);
1072}
1073
1074/**
1075 * Takes a socket created by listenSocket() and sets various options on it
1076 * to prepare for use in the server.
1077 */
1078void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +00001079 // Set socket to nonblocking mode
1080 int flags;
Mark Slee79b16942007-11-26 19:05:29 +00001081 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
1082 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
1083 close(s);
1084 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +00001085 }
1086
1087 int one = 1;
1088 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +00001089
1090 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +00001091 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001092
1093 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +00001094 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +00001095
1096 // Set TCP nodelay if available, MAC OS X Hack
1097 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
1098 #ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +00001099 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001100 #endif
1101
David Reiss1c20c872010-03-09 05:20:14 +00001102 #ifdef TCP_LOW_MIN_RTO
1103 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +00001104 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +00001105 }
1106 #endif
1107
Mark Slee79b16942007-11-26 19:05:29 +00001108 if (listen(s, LISTEN_BACKLOG) == -1) {
1109 close(s);
1110 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +00001111 }
1112
Mark Slee79b16942007-11-26 19:05:29 +00001113 // Cool, this socket is good to go, set it as the serverSocket_
1114 serverSocket_ = s;
1115}
1116
David Reiss068f4162010-03-09 05:19:45 +00001117void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
1118 threadManager_ = threadManager;
1119 if (threadManager != NULL) {
1120 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
1121 threadPoolProcessing_ = true;
1122 } else {
1123 threadPoolProcessing_ = false;
1124 }
1125}
1126
David Reiss01fe1532010-03-09 05:19:25 +00001127bool TNonblockingServer::serverOverloaded() {
1128 size_t activeConnections = numTConnections_ - connectionStack_.size();
1129 if (numActiveProcessors_ > maxActiveProcessors_ ||
1130 activeConnections > maxConnections_) {
1131 if (!overloaded_) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001132 GlobalOutput.printf("TNonblockingServer: overload condition begun.");
David Reiss01fe1532010-03-09 05:19:25 +00001133 overloaded_ = true;
1134 }
1135 } else {
1136 if (overloaded_ &&
1137 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
1138 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001139 GlobalOutput.printf("TNonblockingServer: overload ended; "
1140 "%u dropped (%llu total)",
David Reiss01fe1532010-03-09 05:19:25 +00001141 nConnectionsDropped_, nTotalConnectionsDropped_);
1142 nConnectionsDropped_ = 0;
1143 overloaded_ = false;
1144 }
1145 }
1146
1147 return overloaded_;
1148}
1149
1150bool TNonblockingServer::drainPendingTask() {
1151 if (threadManager_) {
1152 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1153 if (task) {
1154 TConnection* connection =
1155 static_cast<TConnection::Task*>(task.get())->getTConnection();
1156 assert(connection && connection->getServer()
1157 && connection->getState() == APP_WAIT_TASK);
1158 connection->forceClose();
1159 return true;
1160 }
1161 }
1162 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001163}
1164
David Reiss068f4162010-03-09 05:19:45 +00001165void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
1166 TConnection* connection =
1167 static_cast<TConnection::Task*>(task.get())->getTConnection();
Jake Farrellb0d95602011-12-06 01:17:26 +00001168 assert(connection && connection->getServer() &&
1169 connection->getState() == APP_WAIT_TASK);
David Reiss068f4162010-03-09 05:19:45 +00001170 connection->forceClose();
1171}
1172
Jake Farrellb0d95602011-12-06 01:17:26 +00001173void TNonblockingServer::stop() {
1174 // Breaks the event loop in all threads so that they end ASAP.
Roger Meierd0cdecf2011-12-08 19:34:01 +00001175 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001176 ioThreads_[i]->stop();
1177 }
1178}
1179
Mark Slee79b16942007-11-26 19:05:29 +00001180/**
1181 * Main workhorse function, starts up the server listening on a port and
1182 * loops over the libevent handler.
1183 */
1184void TNonblockingServer::serve() {
Jake Farrellb0d95602011-12-06 01:17:26 +00001185 // init listen socket
1186 createAndListenOnSocket();
Mark Slee79b16942007-11-26 19:05:29 +00001187
Jake Farrellb0d95602011-12-06 01:17:26 +00001188 // set up the IO threads
1189 assert(ioThreads_.empty());
1190 if (!numIOThreads_) {
1191 numIOThreads_ = DEFAULT_IO_THREADS;
David Reiss01fe1532010-03-09 05:19:25 +00001192 }
1193
Roger Meierd0cdecf2011-12-08 19:34:01 +00001194 for (uint32_t id = 0; id < numIOThreads_; ++id) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001195 // the first IO thread also does the listening on server socket
1196 int listenFd = (id == 0 ? serverSocket_ : -1);
Mark Slee2f6404d2006-10-10 01:37:40 +00001197
Jake Farrellb0d95602011-12-06 01:17:26 +00001198 shared_ptr<TNonblockingIOThread> thread(
1199 new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
1200 ioThreads_.push_back(thread);
1201 }
1202
1203 // Notify handler of the preServe event
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001204 if (eventHandler_ != NULL) {
1205 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001206 }
1207
Jake Farrellb0d95602011-12-06 01:17:26 +00001208 // Start all of our helper IO threads. Note that the threads run forever,
1209 // only terminating if stop() is called.
1210 assert(ioThreads_.size() == numIOThreads_);
1211 assert(ioThreads_.size() > 0);
1212
1213 GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
1214 port_, ioThreads_.size());
1215
1216 // Launch all the secondary IO threads in separate threads
1217 if (ioThreads_.size() > 1) {
Roger Meier12d70532011-12-14 23:35:28 +00001218 ioThreadFactory_.reset(new PlatformThreadFactory(
1219#ifndef USE_BOOST_THREAD
1220 PlatformThreadFactory::OTHER, // scheduler
1221 PlatformThreadFactory::NORMAL, // priority
Jake Farrellb0d95602011-12-06 01:17:26 +00001222 1, // stack size (MB)
Roger Meier12d70532011-12-14 23:35:28 +00001223#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001224 false // detached
1225 ));
1226
1227 assert(ioThreadFactory_.get());
1228
1229 // intentionally starting at thread 1, not 0
Roger Meierd0cdecf2011-12-08 19:34:01 +00001230 for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001231 shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1232 ioThreads_[i]->setThread(thread);
1233 thread->start();
1234 }
1235 }
1236
1237 // Run the primary (listener) IO thread loop in our main thread; this will
1238 // only return when the server is shutting down.
1239 ioThreads_[0]->run();
1240
1241 // Ensure all threads are finished before exiting serve()
Roger Meierd0cdecf2011-12-08 19:34:01 +00001242 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001243 ioThreads_[i]->join();
1244 GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1245 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001246}
1247
Jake Farrellb0d95602011-12-06 01:17:26 +00001248TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1249 int number,
1250 int listenSocket,
1251 bool useHighPriority)
1252 : server_(server)
1253 , number_(number)
1254 , listenSocket_(listenSocket)
1255 , useHighPriority_(useHighPriority)
1256 , eventBase_(NULL) {
1257 notificationPipeFDs_[0] = -1;
1258 notificationPipeFDs_[1] = -1;
1259}
1260
1261TNonblockingIOThread::~TNonblockingIOThread() {
1262 // make sure our associated thread is fully finished
1263 join();
1264
1265 if (eventBase_) {
1266 event_base_free(eventBase_);
Bryan Duxbury76c43682011-08-24 21:26:48 +00001267 }
1268
Jake Farrellb0d95602011-12-06 01:17:26 +00001269 if (listenSocket_ >= 0) {
1270 if (0 != close(listenSocket_)) {
1271 GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
1272 errno);
1273 }
Roger Meier12d70532011-12-14 23:35:28 +00001274 listenSocket_ = TNonblockingServer::INVALID_SOCKET_VALUE;
Jake Farrellb0d95602011-12-06 01:17:26 +00001275 }
1276
1277 for (int i = 0; i < 2; ++i) {
1278 if (notificationPipeFDs_[i] >= 0) {
1279 if (0 != ::close(notificationPipeFDs_[i])) {
1280 GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
1281 errno);
1282 }
Roger Meier12d70532011-12-14 23:35:28 +00001283 notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET_VALUE;
Jake Farrellb0d95602011-12-06 01:17:26 +00001284 }
1285 }
1286}
1287
1288void TNonblockingIOThread::createNotificationPipe() {
Roger Meier12d70532011-12-14 23:35:28 +00001289 if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
1290 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
Jake Farrellb0d95602011-12-06 01:17:26 +00001291 throw TException("can't create notification pipe");
1292 }
Roger Meier12d70532011-12-14 23:35:28 +00001293 if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
1294 evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001295 close(notificationPipeFDs_[0]);
1296 close(notificationPipeFDs_[1]);
1297 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
1298 }
1299 for (int i = 0; i < 2; ++i) {
Roger Meier12d70532011-12-14 23:35:28 +00001300#if LIBEVENT_VERSION_NUMBER < 0x02000000
1301 int flags;
Jake Farrellb0d95602011-12-06 01:17:26 +00001302 if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
1303 fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
Roger Meier12d70532011-12-14 23:35:28 +00001304#else
1305 if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
1306#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001307 close(notificationPipeFDs_[0]);
1308 close(notificationPipeFDs_[1]);
1309 throw TException("TNonblockingServer::createNotificationPipe() "
1310 "FD_CLOEXEC");
1311 }
1312 }
1313}
1314
1315/**
1316 * Register the core libevent events onto the proper base.
1317 */
1318void TNonblockingIOThread::registerEvents() {
1319 if (listenSocket_ >= 0) {
1320 // Register the server event
1321 event_set(&serverEvent_,
1322 listenSocket_,
1323 EV_READ | EV_PERSIST,
1324 TNonblockingIOThread::listenHandler,
1325 server_);
1326 event_base_set(eventBase_, &serverEvent_);
1327
1328 // Add the event and start up the server
1329 if (-1 == event_add(&serverEvent_, 0)) {
1330 throw TException("TNonblockingServer::serve(): "
1331 "event_add() failed on server listen event");
1332 }
1333 GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
1334 number_);
1335 }
1336
1337 createNotificationPipe();
1338
1339 // Create an event to be notified when a task finishes
1340 event_set(&notificationEvent_,
1341 getNotificationRecvFD(),
1342 EV_READ | EV_PERSIST,
1343 TNonblockingIOThread::notifyHandler,
1344 this);
1345
1346 // Attach to the base
1347 event_base_set(eventBase_, &notificationEvent_);
1348
1349 // Add the event and start up the server
1350 if (-1 == event_add(&notificationEvent_, 0)) {
1351 throw TException("TNonblockingServer::serve(): "
1352 "event_add() failed on task-done notification event");
1353 }
1354 GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
1355 number_);
1356}
1357
1358bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
1359 int fd = getNotificationSendFD();
1360 if (fd < 0) {
1361 return false;
1362 }
1363
1364 const int kSize = sizeof(conn);
Roger Meier12d70532011-12-14 23:35:28 +00001365 if (send(fd, const_cast_sockopt(&conn), kSize, 0) != kSize) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001366 return false;
1367 }
1368
1369 return true;
1370}
1371
1372/* static */
Roger Meier12d70532011-12-14 23:35:28 +00001373void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001374 TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
1375 assert(ioThread);
Roger Meierd0cdecf2011-12-08 19:34:01 +00001376 (void)which;
Jake Farrellb0d95602011-12-06 01:17:26 +00001377
1378 while (true) {
1379 TNonblockingServer::TConnection* connection = 0;
1380 const int kSize = sizeof(connection);
Roger Meier12d70532011-12-14 23:35:28 +00001381 int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
Jake Farrellb0d95602011-12-06 01:17:26 +00001382 if (nBytes == kSize) {
1383 if (connection == NULL) {
1384 // this is the command to stop our thread, exit the handler!
1385 return;
1386 }
1387 connection->transition();
1388 } else if (nBytes > 0) {
1389 // throw away these bytes and hope that next time we get a solid read
1390 GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
1391 nBytes, kSize);
1392 ioThread->breakLoop(true);
1393 return;
1394 } else if (nBytes == 0) {
1395 GlobalOutput.printf("notifyHandler: Notify socket closed!");
1396 // exit the loop
1397 break;
1398 } else { // nBytes < 0
1399 if (errno != EWOULDBLOCK && errno != EAGAIN) {
1400 GlobalOutput.perror(
1401 "TNonblocking: notifyHandler read() failed: ", errno);
1402 ioThread->breakLoop(true);
1403 return;
1404 }
1405 // exit the loop
1406 break;
1407 }
1408 }
1409}
1410
1411void TNonblockingIOThread::breakLoop(bool error) {
1412 if (error) {
1413 GlobalOutput.printf(
1414 "TNonblockingServer: IO thread #%d exiting with error.", number_);
1415 // TODO: figure out something better to do here, but for now kill the
1416 // whole process.
1417 GlobalOutput.printf("TNonblockingServer: aborting process.");
1418 ::abort();
1419 }
1420
1421 // sets a flag so that the loop exits on the next event
Bryan Duxbury76c43682011-08-24 21:26:48 +00001422 event_base_loopbreak(eventBase_);
1423
Jake Farrellb0d95602011-12-06 01:17:26 +00001424 // event_base_loopbreak() only causes the loop to exit the next time
1425 // it wakes up. We need to force it to wake up, in case there are
1426 // no real events it needs to process.
Bryan Duxbury76c43682011-08-24 21:26:48 +00001427 //
Jake Farrellb0d95602011-12-06 01:17:26 +00001428 // If we're running in the same thread, we can't use the notify(0)
1429 // mechanism to stop the thread, but happily if we're running in the
1430 // same thread, this means the thread can't be blocking in the event
1431 // loop either.
Roger Meier12d70532011-12-14 23:35:28 +00001432 if (!Thread::is_current(threadId_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001433 notify(NULL);
1434 }
1435}
1436
1437void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
Roger Meier12d70532011-12-14 23:35:28 +00001438#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +00001439 // Start out with a standard, low-priority setup for the sched params.
1440 struct sched_param sp;
1441 bzero((void*) &sp, sizeof(sp));
1442 int policy = SCHED_OTHER;
1443
1444 // If desired, set up high-priority sched params structure.
1445 if (value) {
1446 // FIFO scheduler, ranked above default SCHED_OTHER queue
1447 policy = SCHED_FIFO;
1448 // The priority only compares us to other SCHED_FIFO threads, so we
1449 // just pick a random priority halfway between min & max.
1450 const int priority = (sched_get_priority_max(policy) +
1451 sched_get_priority_min(policy)) / 2;
1452
1453 sp.sched_priority = priority;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001454 }
1455
Jake Farrellb0d95602011-12-06 01:17:26 +00001456 // Actually set the sched params for the current thread.
1457 if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
1458 GlobalOutput.printf(
1459 "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
1460 } else {
1461 GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno);
1462 }
Roger Meier12d70532011-12-14 23:35:28 +00001463#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001464}
Bryan Duxbury76c43682011-08-24 21:26:48 +00001465
Jake Farrellb0d95602011-12-06 01:17:26 +00001466void TNonblockingIOThread::run() {
Roger Meier12d70532011-12-14 23:35:28 +00001467 threadId_ = Thread::get_current();
Jake Farrellb0d95602011-12-06 01:17:26 +00001468
1469 assert(eventBase_ == 0);
1470 eventBase_ = event_base_new();
1471
1472 // Print some libevent stats
1473 if (number_ == 0) {
1474 GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
1475 event_get_version(),
1476 event_base_get_method(eventBase_));
1477 }
1478
1479
1480 registerEvents();
1481
1482 GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
1483 number_);
1484
1485 if (useHighPriority_) {
1486 setCurrentThreadHighPriority(true);
1487 }
1488
1489 // Run libevent engine, never returns, invokes calls to eventHandler
1490 event_base_loop(eventBase_, 0);
1491
1492 if (useHighPriority_) {
1493 setCurrentThreadHighPriority(false);
1494 }
1495
1496 // cleans up our registered events
1497 cleanupEvents();
1498
1499 GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
1500 number_);
1501}
1502
1503void TNonblockingIOThread::cleanupEvents() {
1504 // stop the listen socket, if any
1505 if (listenSocket_ >= 0) {
1506 if (event_del(&serverEvent_) == -1) {
1507 GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", errno);
1508 }
1509 }
1510
1511 event_del(&notificationEvent_);
1512}
1513
1514
1515void TNonblockingIOThread::stop() {
1516 // This should cause the thread to fall out of its event loop ASAP.
1517 breakLoop(false);
1518}
1519
1520void TNonblockingIOThread::join() {
1521 // If this was a thread created by a factory (not the thread that called
1522 // serve()), we join() it to make sure we shut down fully.
1523 if (thread_) {
1524 try {
1525 // Note that it is safe to both join() ourselves twice, as well as join
1526 // the current thread as the pthread implementation checks for deadlock.
1527 thread_->join();
1528 } catch(...) {
1529 // swallow everything
1530 }
1531 }
Bryan Duxbury76c43682011-08-24 21:26:48 +00001532}
1533
T Jake Lucianib5e62212009-01-31 22:36:20 +00001534}}} // apache::thrift::server