blob: 0e44ab2b90b5ff233eda61fe428388ff708bd1ce [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meier3781c242011-12-11 20:07:21 +000020#define __STDC_FORMAT_MACROS
21
Roger Meier2fa9c312011-09-05 19:15:53 +000022#ifdef HAVE_CONFIG_H
23#include <config.h>
24#endif
25
Mark Slee2f6404d2006-10-10 01:37:40 +000026#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000027#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000028#include <transport/TSocket.h>
Jake Farrellb0d95602011-12-06 01:17:26 +000029#include <concurrency/PosixThreadFactory.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000030
Mark Sleee02385b2007-06-09 01:21:16 +000031#include <iostream>
Roger Meier30aae0c2011-07-08 12:23:31 +000032
33#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000034#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000035#endif
36
37#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000038#include <netinet/in.h>
39#include <netinet/tcp.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000040#endif
41
42#ifdef HAVE_ARPA_INET_H
Bryan Duxbury76c43682011-08-24 21:26:48 +000043#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000044#endif
45
46#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000047#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000048#endif
49
Roger Meier2fa9c312011-09-05 19:15:53 +000050#ifdef HAVE_FCNTL_H
Mark Slee2f6404d2006-10-10 01:37:40 +000051#include <fcntl.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000052#endif
53
Mark Slee2f6404d2006-10-10 01:37:40 +000054#include <errno.h>
55#include <assert.h>
Jake Farrellb0d95602011-12-06 01:17:26 +000056#include <sched.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000057
David Reiss9b903442009-10-21 05:51:28 +000058#ifndef AF_LOCAL
59#define AF_LOCAL AF_UNIX
60#endif
61
T Jake Lucianib5e62212009-01-31 22:36:20 +000062namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000063
T Jake Lucianib5e62212009-01-31 22:36:20 +000064using namespace apache::thrift::protocol;
65using namespace apache::thrift::transport;
66using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000067using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000068using apache::thrift::transport::TSocket;
69using apache::thrift::transport::TTransportException;
Jake Farrellb0d95602011-12-06 01:17:26 +000070using boost::shared_ptr;
Mark Sleee02385b2007-06-09 01:21:16 +000071
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000072/// Three states for sockets: recv frame size, recv data, and send mode
73enum TSocketState {
74 SOCKET_RECV_FRAMING,
75 SOCKET_RECV,
76 SOCKET_SEND
77};
78
79/**
80 * Five states for the nonblocking server:
81 * 1) initialize
82 * 2) read 4 byte frame size
83 * 3) read frame of data
84 * 4) send back data (if any)
85 * 5) force immediate connection close
86 */
87enum TAppState {
88 APP_INIT,
89 APP_READ_FRAME_SIZE,
90 APP_READ_REQUEST,
91 APP_WAIT_TASK,
92 APP_SEND_RESULT,
93 APP_CLOSE_CONNECTION
94};
95
96/**
97 * Represents a connection that is handled via libevent. This connection
98 * essentially encapsulates a socket that has some associated libevent state.
99 */
100class TNonblockingServer::TConnection {
101 private:
Jake Farrellb0d95602011-12-06 01:17:26 +0000102 /// Server IO Thread handling this connection
103 TNonblockingIOThread* ioThread_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000104
105 /// Server handle
106 TNonblockingServer* server_;
107
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000108 /// TProcessor
109 boost::shared_ptr<TProcessor> processor_;
110
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000111 /// Object wrapping network socket
112 boost::shared_ptr<TSocket> tSocket_;
113
114 /// Libevent object
115 struct event event_;
116
117 /// Libevent flags
118 short eventFlags_;
119
120 /// Socket mode
121 TSocketState socketState_;
122
123 /// Application state
124 TAppState appState_;
125
126 /// How much data needed to read
127 uint32_t readWant_;
128
129 /// Where in the read buffer are we
130 uint32_t readBufferPos_;
131
132 /// Read buffer
133 uint8_t* readBuffer_;
134
135 /// Read buffer size
136 uint32_t readBufferSize_;
137
138 /// Write buffer
139 uint8_t* writeBuffer_;
140
141 /// Write buffer size
142 uint32_t writeBufferSize_;
143
144 /// How far through writing are we?
145 uint32_t writeBufferPos_;
146
147 /// Largest size of write buffer seen since buffer was constructed
148 size_t largestWriteBufferSize_;
149
150 /// Count of the number of calls for use with getResizeBufferEveryN().
151 int32_t callsForResize_;
152
153 /// Task handle
154 int taskHandle_;
155
156 /// Task event
157 struct event taskEvent_;
158
159 /// Transport to read from
160 boost::shared_ptr<TMemoryBuffer> inputTransport_;
161
162 /// Transport that processor writes to
163 boost::shared_ptr<TMemoryBuffer> outputTransport_;
164
165 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
166 boost::shared_ptr<TTransport> factoryInputTransport_;
167 boost::shared_ptr<TTransport> factoryOutputTransport_;
168
169 /// Protocol decoder
170 boost::shared_ptr<TProtocol> inputProtocol_;
171
172 /// Protocol encoder
173 boost::shared_ptr<TProtocol> outputProtocol_;
174
175 /// Server event handler, if any
176 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
177
178 /// Thrift call context, if any
179 void *connectionContext_;
180
181 /// Go into read mode
182 void setRead() {
183 setFlags(EV_READ | EV_PERSIST);
184 }
185
186 /// Go into write mode
187 void setWrite() {
188 setFlags(EV_WRITE | EV_PERSIST);
189 }
190
191 /// Set socket idle
192 void setIdle() {
193 setFlags(0);
194 }
195
196 /**
197 * Set event flags for this connection.
198 *
199 * @param eventFlags flags we pass to libevent for the connection.
200 */
201 void setFlags(short eventFlags);
202
203 /**
204 * Libevent handler called (via our static wrapper) when the connection
205 * socket had something happen. Rather than use the flags libevent passed,
206 * we use the connection state to determine whether we need to read or
207 * write the socket.
208 */
209 void workSocket();
210
211 /// Close this connection and free or reset its resources.
212 void close();
213
214 public:
215
216 class Task;
217
218 /// Constructor
Jake Farrellb0d95602011-12-06 01:17:26 +0000219 TConnection(int socket, TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000220 const sockaddr* addr, socklen_t addrLen) {
221 readBuffer_ = NULL;
222 readBufferSize_ = 0;
223
Jake Farrellb0d95602011-12-06 01:17:26 +0000224 ioThread_ = ioThread;
225 server_ = ioThread->getServer();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000226
Jake Farrellb0d95602011-12-06 01:17:26 +0000227 // Allocate input and output transports these only need to be allocated
228 // once per TConnection (they don't need to be reallocated on init() call)
229 inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
230 outputTransport_.reset(new TMemoryBuffer(
231 server_->getWriteBufferDefaultSize()));
232 tSocket_.reset(new TSocket());
233 init(socket, ioThread, addr, addrLen);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000234 }
235
236 ~TConnection() {
237 std::free(readBuffer_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000238 }
239
240 /**
241 * Check buffers against any size limits and shrink it if exceeded.
242 *
243 * @param readLimit we reduce read buffer size to this (if nonzero).
244 * @param writeLimit if nonzero and write buffer is larger, replace it.
245 */
246 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
247
248 /// Initialize
Jake Farrellb0d95602011-12-06 01:17:26 +0000249 void init(int socket, TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000250 const sockaddr* addr, socklen_t addrLen);
251
252 /**
253 * This is called when the application transitions from one state into
254 * another. This means that it has finished writing the data that it needed
255 * to, or finished receiving the data that it needed to.
256 */
257 void transition();
258
259 /**
260 * C-callable event handler for connection events. Provides a callback
261 * that libevent can understand which invokes connection_->workSocket().
262 *
263 * @param fd the descriptor the event occurred on.
264 * @param which the flags associated with the event.
265 * @param v void* callback arg where we placed TConnection's "this".
266 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000267 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000268 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
269 ((TConnection*)v)->workSocket();
270 }
271
272 /**
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000273 * Notification to server that processing has ended on this request.
274 * Can be called either when processing is completed or when a waiting
275 * task has been preemptively terminated (on overload).
276 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000277 * Don't call this from the IO thread itself.
278 *
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000279 * @return true if successful, false if unable to notify (check errno).
280 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000281 bool notifyIOThread() {
282 return ioThread_->notify(this);
283 }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000284
Jake Farrellb0d95602011-12-06 01:17:26 +0000285 /*
286 * Returns the number of this connection's currently assigned IO
287 * thread.
288 */
289 int getIOThreadNumber() const {
290 return ioThread_->getThreadNumber();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000291 }
292
293 /// Force connection shutdown for this connection.
294 void forceClose() {
295 appState_ = APP_CLOSE_CONNECTION;
Jake Farrellb0d95602011-12-06 01:17:26 +0000296 if (!notifyIOThread()) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000297 throw TException("TConnection::forceClose: failed write on notify pipe");
298 }
299 }
300
301 /// return the server this connection was initialized for.
Jake Farrellb0d95602011-12-06 01:17:26 +0000302 TNonblockingServer* getServer() const {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000303 return server_;
304 }
305
306 /// get state of connection.
Jake Farrellb0d95602011-12-06 01:17:26 +0000307 TAppState getState() const {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000308 return appState_;
309 }
310
311 /// return the TSocket transport wrapping this network connection
312 boost::shared_ptr<TSocket> getTSocket() const {
313 return tSocket_;
314 }
315
316 /// return the server event handler if any
317 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
318 return serverEventHandler_;
319 }
320
321 /// return the Thrift connection context if any
322 void* getConnectionContext() {
323 return connectionContext_;
324 }
325
326};
327
328class TNonblockingServer::TConnection::Task: public Runnable {
Mark Sleee02385b2007-06-09 01:21:16 +0000329 public:
330 Task(boost::shared_ptr<TProcessor> processor,
331 boost::shared_ptr<TProtocol> input,
332 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +0000333 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +0000334 processor_(processor),
335 input_(input),
336 output_(output),
David Reiss105961d2010-10-06 17:10:17 +0000337 connection_(connection),
338 serverEventHandler_(connection_->getServerEventHandler()),
339 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000340
341 void run() {
342 try {
David Reiss105961d2010-10-06 17:10:17 +0000343 for (;;) {
344 if (serverEventHandler_ != NULL) {
345 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
346 }
347 if (!processor_->process(input_, output_, connectionContext_) ||
348 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000349 break;
350 }
351 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000352 } catch (const TTransportException& ttx) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000353 GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
Bryan Duxbury1e987582011-08-25 17:33:03 +0000354 } catch (const bad_alloc&) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000355 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
David Reiss28e88ec2010-03-09 05:19:27 +0000356 exit(-1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000357 } catch (const std::exception& x) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000358 GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
Bryan Duxbury1e987582011-08-25 17:33:03 +0000359 typeid(x).name(), x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000360 } catch (...) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000361 GlobalOutput.printf(
362 "TNonblockingServer: unknown exception while processing.");
Mark Sleee02385b2007-06-09 01:21:16 +0000363 }
Mark Slee79b16942007-11-26 19:05:29 +0000364
David Reiss01fe1532010-03-09 05:19:25 +0000365 // Signal completion back to the libevent thread via a pipe
Jake Farrellb0d95602011-12-06 01:17:26 +0000366 if (!connection_->notifyIOThread()) {
David Reiss01fe1532010-03-09 05:19:25 +0000367 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000368 }
David Reiss01fe1532010-03-09 05:19:25 +0000369 }
370
371 TConnection* getTConnection() {
372 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +0000373 }
374
375 private:
376 boost::shared_ptr<TProcessor> processor_;
377 boost::shared_ptr<TProtocol> input_;
378 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000379 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000380 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
381 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000382};
Mark Slee5ea15f92007-03-05 22:55:59 +0000383
Jake Farrellb0d95602011-12-06 01:17:26 +0000384void TNonblockingServer::TConnection::init(int socket,
385 TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000386 const sockaddr* addr,
387 socklen_t addrLen) {
David Reiss105961d2010-10-06 17:10:17 +0000388 tSocket_->setSocketFD(socket);
389 tSocket_->setCachedAddress(addr, addrLen);
390
Jake Farrellb0d95602011-12-06 01:17:26 +0000391 ioThread_ = ioThread;
392 server_ = ioThread->getServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000393 appState_ = APP_INIT;
394 eventFlags_ = 0;
395
396 readBufferPos_ = 0;
397 readWant_ = 0;
398
399 writeBuffer_ = NULL;
400 writeBufferSize_ = 0;
401 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000402 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000403
David Reiss89a12942010-10-06 17:10:52 +0000404 socketState_ = SOCKET_RECV_FRAMING;
David Reiss54bec5d2010-10-06 17:10:45 +0000405 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000406
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000407 // get input/transports
Jake Farrellb0d95602011-12-06 01:17:26 +0000408 factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
409 inputTransport_);
410 factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
411 outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000412
413 // Create protocol
Jake Farrellb0d95602011-12-06 01:17:26 +0000414 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
415 factoryInputTransport_);
416 outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
417 factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000418
419 // Set up for any server event handler
420 serverEventHandler_ = server_->getEventHandler();
421 if (serverEventHandler_ != NULL) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000422 connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
423 outputProtocol_);
David Reiss105961d2010-10-06 17:10:17 +0000424 } else {
425 connectionContext_ = NULL;
426 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000427
428 // Get the processor
Jake Farrellb0d95602011-12-06 01:17:26 +0000429 processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000430}
431
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000432void TNonblockingServer::TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000433 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000434 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000435
436 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000437 case SOCKET_RECV_FRAMING:
438 union {
439 uint8_t buf[sizeof(uint32_t)];
Roger Meier3781c242011-12-11 20:07:21 +0000440 uint32_t size;
David Reiss89a12942010-10-06 17:10:52 +0000441 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000442
David Reiss89a12942010-10-06 17:10:52 +0000443 // if we've already received some bytes we kept them here
444 framing.size = readWant_;
445 // determine size of this frame
446 try {
447 // Read from the socket
448 fetch = tSocket_->read(&framing.buf[readBufferPos_],
449 uint32_t(sizeof(framing.size) - readBufferPos_));
450 if (fetch == 0) {
451 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000452 close();
453 return;
454 }
David Reiss89a12942010-10-06 17:10:52 +0000455 readBufferPos_ += fetch;
456 } catch (TTransportException& te) {
457 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
458 close();
459
460 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000461 }
462
David Reiss89a12942010-10-06 17:10:52 +0000463 if (readBufferPos_ < sizeof(framing.size)) {
464 // more needed before frame size is known -- save what we have so far
465 readWant_ = framing.size;
466 return;
467 }
468
469 readWant_ = ntohl(framing.size);
Roger Meier3781c242011-12-11 20:07:21 +0000470 if (readWant_ > server_->getMaxFrameSize()) {
471 // Don't allow giant frame sizes. This prevents bad clients from
472 // causing us to try and allocate a giant buffer.
473 GlobalOutput.printf("TNonblockingServer: frame size too large "
474 "(%"PRIu32" > %zu) from client %s. remote side not "
475 "using TFramedTransport?",
476 readWant_, server_->getMaxFrameSize(),
477 tSocket_->getSocketInfo().c_str());
David Reiss89a12942010-10-06 17:10:52 +0000478 close();
479 return;
480 }
481 // size known; now get the rest of the frame
482 transition();
483 return;
484
485 case SOCKET_RECV:
486 // It is an error to be in this state if we already have all the data
487 assert(readBufferPos_ < readWant_);
488
David Reiss105961d2010-10-06 17:10:17 +0000489 try {
490 // Read from the socket
491 fetch = readWant_ - readBufferPos_;
492 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
493 }
494 catch (TTransportException& te) {
495 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
496 close();
Mark Slee79b16942007-11-26 19:05:29 +0000497
David Reiss105961d2010-10-06 17:10:17 +0000498 return;
499 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000500
Mark Slee2f6404d2006-10-10 01:37:40 +0000501 if (got > 0) {
502 // Move along in the buffer
503 readBufferPos_ += got;
504
505 // Check that we did not overdo it
506 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000507
Mark Slee2f6404d2006-10-10 01:37:40 +0000508 // We are done reading, move onto the next state
509 if (readBufferPos_ == readWant_) {
510 transition();
511 }
512 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000513 }
514
515 // Whenever we get down here it means a remote disconnect
516 close();
Mark Slee79b16942007-11-26 19:05:29 +0000517
Mark Slee2f6404d2006-10-10 01:37:40 +0000518 return;
519
520 case SOCKET_SEND:
521 // Should never have position past size
522 assert(writeBufferPos_ <= writeBufferSize_);
523
524 // If there is no data to send, then let us move on
525 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000526 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000527 transition();
528 return;
529 }
530
David Reiss105961d2010-10-06 17:10:17 +0000531 try {
532 left = writeBufferSize_ - writeBufferPos_;
533 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
534 }
535 catch (TTransportException& te) {
536 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000537 close();
538 return;
539 }
540
541 writeBufferPos_ += sent;
542
543 // Did we overdo it?
544 assert(writeBufferPos_ <= writeBufferSize_);
545
Mark Slee79b16942007-11-26 19:05:29 +0000546 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000547 if (writeBufferPos_ == writeBufferSize_) {
548 transition();
549 }
550
551 return;
552
553 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000554 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000555 assert(0);
556 }
557}
558
559/**
560 * This is called when the application transitions from one state into
561 * another. This means that it has finished writing the data that it needed
562 * to, or finished receiving the data that it needed to.
563 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000564void TNonblockingServer::TConnection::transition() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000565 // ensure this connection is active right now
566 assert(ioThread_);
567 assert(server_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000568
Mark Slee2f6404d2006-10-10 01:37:40 +0000569 // Switch upon the state that we are currently in and move to a new state
570 switch (appState_) {
571
572 case APP_READ_REQUEST:
573 // We are done reading the request, package the read buffer into transport
574 // and get back some data from the dispatch function
575 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000576 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000577 // Prepend four bytes of blank space to the buffer so we can
578 // write the frame size there later.
579 outputTransport_->getWritePtr(4);
580 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000581
David Reiss01fe1532010-03-09 05:19:25 +0000582 server_->incrementActiveProcessors();
583
Mark Sleee02385b2007-06-09 01:21:16 +0000584 if (server_->isThreadPoolProcessing()) {
585 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000586
David Reiss01fe1532010-03-09 05:19:25 +0000587 // Create task and dispatch to the thread manager
588 boost::shared_ptr<Runnable> task =
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000589 boost::shared_ptr<Runnable>(new Task(processor_,
David Reiss01fe1532010-03-09 05:19:25 +0000590 inputProtocol_,
591 outputProtocol_,
592 this));
593 // The application is now waiting on the task to finish
594 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000595
David Reisse11f3072008-10-07 21:39:19 +0000596 try {
597 server_->addTask(task);
598 } catch (IllegalStateException & ise) {
599 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000600 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000601 close();
602 }
Mark Slee402ee282007-08-23 01:43:20 +0000603
David Reiss01fe1532010-03-09 05:19:25 +0000604 // Set this connection idle so that libevent doesn't process more
605 // data on it while we're still waiting for the threadmanager to
606 // finish this task
607 setIdle();
608 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000609 } else {
610 try {
611 // Invoke the processor
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000612 processor_->process(inputProtocol_, outputProtocol_,
613 connectionContext_);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000614 } catch (const TTransportException &ttx) {
615 GlobalOutput.printf("TNonblockingServer transport error in "
616 "process(): %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000617 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000618 close();
619 return;
Bryan Duxbury1e987582011-08-25 17:33:03 +0000620 } catch (const std::exception &x) {
621 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
622 typeid(x).name(), x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000623 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000624 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000625 return;
626 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000627 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000628 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000629 close();
630 return;
631 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000632 }
633
Mark Slee402ee282007-08-23 01:43:20 +0000634 // Intentionally fall through here, the call to process has written into
635 // the writeBuffer_
636
Mark Sleee02385b2007-06-09 01:21:16 +0000637 case APP_WAIT_TASK:
638 // We have now finished processing a task and the result has been written
639 // into the outputTransport_, so we grab its contents and place them into
640 // the writeBuffer_ for actual writing by the libevent thread
641
David Reiss01fe1532010-03-09 05:19:25 +0000642 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000643 // Get the result of the operation
644 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
645
646 // If the function call generated return data, then move into the send
647 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000648 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000649 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000650
651 // Move into write state
652 writeBufferPos_ = 0;
653 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000654
David Reissaf787782008-07-03 20:29:34 +0000655 // Put the frame size into the write buffer
656 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
657 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000658
659 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000660 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000661 setWrite();
662
663 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000664 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000665
666 return;
667 }
668
David Reissc51986f2009-03-24 20:01:25 +0000669 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000670 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000671 goto LABEL_APP_INIT;
672
Mark Slee2f6404d2006-10-10 01:37:40 +0000673 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000674 // it's now safe to perform buffer size housekeeping.
675 if (writeBufferSize_ > largestWriteBufferSize_) {
676 largestWriteBufferSize_ = writeBufferSize_;
677 }
678 if (server_->getResizeBufferEveryN() > 0
679 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
680 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
681 server_->getIdleWriteBufferLimit());
682 callsForResize_ = 0;
683 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000684
685 // N.B.: We also intentionally fall through here into the INIT state!
686
Mark Slee92f00fb2006-10-25 01:28:17 +0000687 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000688 case APP_INIT:
689
690 // Clear write buffer variables
691 writeBuffer_ = NULL;
692 writeBufferPos_ = 0;
693 writeBufferSize_ = 0;
694
Mark Slee2f6404d2006-10-10 01:37:40 +0000695 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000696 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000697 appState_ = APP_READ_FRAME_SIZE;
698
David Reiss89a12942010-10-06 17:10:52 +0000699 readBufferPos_ = 0;
700
Mark Slee2f6404d2006-10-10 01:37:40 +0000701 // Register read event
702 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000703
Mark Slee2f6404d2006-10-10 01:37:40 +0000704 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000705 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000706
707 return;
708
709 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000710 // We just read the request length
711 // Double the buffer size until it is big enough
712 if (readWant_ > readBufferSize_) {
713 if (readBufferSize_ == 0) {
714 readBufferSize_ = 1;
715 }
716 uint32_t newSize = readBufferSize_;
717 while (readWant_ > newSize) {
718 newSize *= 2;
719 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000720
David Reiss89a12942010-10-06 17:10:52 +0000721 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
722 if (newBuffer == NULL) {
723 // nothing else to be done...
724 throw std::bad_alloc();
725 }
726 readBuffer_ = newBuffer;
727 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000728 }
729
Mark Slee2f6404d2006-10-10 01:37:40 +0000730 readBufferPos_= 0;
731
732 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000733 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000734 appState_ = APP_READ_REQUEST;
735
736 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000737 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000738
739 return;
740
David Reiss01fe1532010-03-09 05:19:25 +0000741 case APP_CLOSE_CONNECTION:
742 server_->decrementActiveProcessors();
743 close();
744 return;
745
Mark Slee2f6404d2006-10-10 01:37:40 +0000746 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000747 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000748 assert(0);
749 }
750}
751
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000752void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000753 // Catch the do nothing case
754 if (eventFlags_ == eventFlags) {
755 return;
756 }
757
758 // Delete a previously existing event
759 if (eventFlags_ != 0) {
760 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000761 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000762 return;
763 }
764 }
765
766 // Update in memory structure
767 eventFlags_ = eventFlags;
768
Mark Slee402ee282007-08-23 01:43:20 +0000769 // Do not call event_set if there are no flags
770 if (!eventFlags_) {
771 return;
772 }
773
David Reiss01fe1532010-03-09 05:19:25 +0000774 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000775 * event_set:
776 *
777 * Prepares the event structure &event to be used in future calls to
778 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000779 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000780 *
781 * The events can be either EV_READ, EV_WRITE, or both, indicating
782 * that an application can read or write from the file respectively without
783 * blocking.
784 *
Mark Sleee02385b2007-06-09 01:21:16 +0000785 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000786 * the event and the type of event which will be one of: EV_TIMEOUT,
787 * EV_SIGNAL, EV_READ, EV_WRITE.
788 *
789 * The additional flag EV_PERSIST makes an event_add() persistent until
790 * event_del() has been called.
791 *
792 * Once initialized, the &event struct can be used repeatedly with
793 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000794 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000795 * when an ev structure has been added to libevent using event_add() the
796 * structure must persist until the event occurs (assuming EV_PERSIST
797 * is not set) or is removed using event_del(). You may not reuse the same
798 * ev structure for multiple monitored descriptors; each descriptor needs
799 * its own ev.
800 */
David Reiss105961d2010-10-06 17:10:17 +0000801 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
802 TConnection::eventHandler, this);
Jake Farrellb0d95602011-12-06 01:17:26 +0000803 event_base_set(ioThread_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000804
805 // Add the event
806 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000807 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000808 }
809}
810
811/**
812 * Closes a connection
813 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000814void TNonblockingServer::TConnection::close() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000815 // Delete the registered libevent
816 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000817 GlobalOutput.perror("TConnection::close() event_del", errno);
818 }
819
820 if (serverEventHandler_ != NULL) {
821 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000822 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000823 ioThread_ = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000824
825 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000826 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000827
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000828 // close any factory produced transports
829 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000830 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000831
Mark Slee2f6404d2006-10-10 01:37:40 +0000832 // Give this object back to the server that owns it
833 server_->returnConnection(this);
834}
835
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000836void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
837 size_t readLimit,
838 size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000839 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000840 free(readBuffer_);
841 readBuffer_ = NULL;
842 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000843 }
David Reiss54bec5d2010-10-06 17:10:45 +0000844
845 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
846 // just start over
David Reiss89a12942010-10-06 17:10:52 +0000847 outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
David Reiss54bec5d2010-10-06 17:10:45 +0000848 largestWriteBufferSize_ = 0;
849 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000850}
851
David Reiss8ede8182010-09-02 15:26:28 +0000852TNonblockingServer::~TNonblockingServer() {
853 // TODO: We currently leak any active TConnection objects.
854 // Since we're shutting down and destroying the event_base, the TConnection
855 // objects will never receive any additional callbacks. (And even if they
856 // did, it would be bad, since they keep a pointer around to the server,
857 // which is being destroyed.)
858
859 // Clean up unused TConnection objects in connectionStack_
860 while (!connectionStack_.empty()) {
861 TConnection* connection = connectionStack_.top();
862 connectionStack_.pop();
863 delete connection;
864 }
David Reiss8ede8182010-09-02 15:26:28 +0000865}
866
Mark Slee2f6404d2006-10-10 01:37:40 +0000867/**
868 * Creates a new connection either by reusing an object off the stack or
869 * by allocating a new one entirely
870 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000871TNonblockingServer::TConnection* TNonblockingServer::createConnection(
Jake Farrellb0d95602011-12-06 01:17:26 +0000872 int socket, const sockaddr* addr, socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000873 // Check the stack
Jake Farrellb0d95602011-12-06 01:17:26 +0000874 Guard g(connMutex_);
875
876 // pick an IO thread to handle this connection -- currently round robin
Jake Farrellb0d95602011-12-06 01:17:26 +0000877 assert(nextIOThread_ < ioThreads_.size());
878 int selectedThreadIdx = nextIOThread_;
879 nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
880
881 TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
882
883 // Check the connection stack to see if we can re-use
884 TConnection* result = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000885 if (connectionStack_.empty()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000886 result = new TConnection(socket, ioThread, addr, addrLen);
887 ++numTConnections_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000888 } else {
Jake Farrellb0d95602011-12-06 01:17:26 +0000889 result = connectionStack_.top();
Mark Slee2f6404d2006-10-10 01:37:40 +0000890 connectionStack_.pop();
Jake Farrellb0d95602011-12-06 01:17:26 +0000891 result->init(socket, ioThread, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000892 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000893 return result;
Mark Slee2f6404d2006-10-10 01:37:40 +0000894}
895
896/**
897 * Returns a connection to the stack
898 */
899void TNonblockingServer::returnConnection(TConnection* connection) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000900 Guard g(connMutex_);
901
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000902 if (connectionStackLimit_ &&
903 (connectionStack_.size() >= connectionStackLimit_)) {
904 delete connection;
Jake Farrellb0d95602011-12-06 01:17:26 +0000905 --numTConnections_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000906 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000907 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000908 connectionStack_.push(connection);
909 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000910}
911
912/**
David Reissa79e4882008-03-05 07:51:47 +0000913 * Server socket had something happen. We accept all waiting client
914 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000915 */
916void TNonblockingServer::handleEvent(int fd, short which) {
Roger Meier3b771a12010-11-17 22:11:26 +0000917 (void) which;
David Reiss3bb5e052010-01-25 19:31:31 +0000918 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000919 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000920
Mark Slee2f6404d2006-10-10 01:37:40 +0000921 // Server socket accepted a new connection
922 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000923 sockaddr_storage addrStorage;
924 sockaddr* addrp = (sockaddr*)&addrStorage;
925 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000926
Mark Slee2f6404d2006-10-10 01:37:40 +0000927 // Going to accept a new client socket
928 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000929
Mark Slee2f6404d2006-10-10 01:37:40 +0000930 // Accept as many new clients as possible, even though libevent signaled only
931 // one, this helps us to avoid having to go back into the libevent engine so
932 // many times
David Reiss105961d2010-10-06 17:10:17 +0000933 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000934 // If we're overloaded, take action here
935 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000936 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000937 nConnectionsDropped_++;
938 nTotalConnectionsDropped_++;
939 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
940 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000941 return;
David Reiss01fe1532010-03-09 05:19:25 +0000942 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
943 if (!drainPendingTask()) {
944 // Nothing left to discard, so we drop connection instead.
945 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000946 return;
David Reiss01fe1532010-03-09 05:19:25 +0000947 }
948 }
949 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000950
Mark Slee2f6404d2006-10-10 01:37:40 +0000951 // Explicitly set this socket to NONBLOCK mode
952 int flags;
953 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
954 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000955 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000956 close(clientSocket);
957 return;
958 }
959
960 // Create a new TConnection for this client socket.
961 TConnection* clientConnection =
Jake Farrellb0d95602011-12-06 01:17:26 +0000962 createConnection(clientSocket, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000963
964 // Fail fast if we could not create a TConnection object
965 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000966 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000967 close(clientSocket);
968 return;
969 }
970
Jake Farrellb0d95602011-12-06 01:17:26 +0000971 /*
972 * Either notify the ioThread that is assigned this connection to
973 * start processing, or if it is us, we'll just ask this
974 * connection to do its initial state change here.
975 *
976 * (We need to avoid writing to our own notification pipe, to
977 * avoid possible deadlocks if the pipe is full.)
978 *
979 * The IO thread #0 is the only one that handles these listen
980 * events, so unless the connection has been assigned to thread #0
981 * we know it's not on our thread.
982 */
983 if (clientConnection->getIOThreadNumber() == 0) {
984 clientConnection->transition();
985 } else {
986 clientConnection->notifyIOThread();
987 }
David Reiss3e7fca42009-09-19 01:59:13 +0000988
989 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000990 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000991 }
Mark Slee79b16942007-11-26 19:05:29 +0000992
Jake Farrellb0d95602011-12-06 01:17:26 +0000993
Mark Slee2f6404d2006-10-10 01:37:40 +0000994 // Done looping accept, now we have to make sure the error is due to
995 // blocking. Any other error is a problem
996 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000997 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000998 }
999}
1000
1001/**
Mark Slee79b16942007-11-26 19:05:29 +00001002 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +00001003 */
Jake Farrellb0d95602011-12-06 01:17:26 +00001004void TNonblockingServer::createAndListenOnSocket() {
Mark Slee79b16942007-11-26 19:05:29 +00001005 int s;
Jake Farrellb0d95602011-12-06 01:17:26 +00001006
Mark Sleefb4b5142007-11-20 01:27:08 +00001007 struct addrinfo hints, *res, *res0;
1008 int error;
Mark Slee79b16942007-11-26 19:05:29 +00001009
Mark Sleefb4b5142007-11-20 01:27:08 +00001010 char port[sizeof("65536") + 1];
1011 memset(&hints, 0, sizeof(hints));
1012 hints.ai_family = PF_UNSPEC;
1013 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +00001014 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +00001015 sprintf(port, "%d", port_);
1016
1017 // Wildcard address
1018 error = getaddrinfo(NULL, port, &hints, &res0);
1019 if (error) {
David Reiss9b209552008-04-08 06:26:05 +00001020 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
1021 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +00001022 return;
1023 }
1024
1025 // Pick the ipv6 address first since ipv4 addresses can be mapped
1026 // into ipv6 space.
1027 for (res = res0; res; res = res->ai_next) {
1028 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
1029 break;
1030 }
1031
Mark Slee2f6404d2006-10-10 01:37:40 +00001032 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +00001033 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
1034 if (s == -1) {
1035 freeaddrinfo(res0);
1036 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +00001037 }
1038
David Reiss13aea462008-06-10 22:56:04 +00001039 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +00001040 if (res->ai_family == AF_INET6) {
1041 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +00001042 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +00001043 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
1044 }
David Reiss13aea462008-06-10 22:56:04 +00001045 }
1046 #endif // #ifdef IPV6_V6ONLY
1047
1048
Mark Slee79b16942007-11-26 19:05:29 +00001049 int one = 1;
1050
1051 // Set reuseaddr to avoid 2MSL delay on server restart
Roger Meier30aae0c2011-07-08 12:23:31 +00001052 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +00001053
Roger Meier30aae0c2011-07-08 12:23:31 +00001054 if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
Mark Slee79b16942007-11-26 19:05:29 +00001055 close(s);
1056 freeaddrinfo(res0);
1057 throw TException("TNonblockingServer::serve() bind");
1058 }
1059
1060 // Done with the addr info
1061 freeaddrinfo(res0);
1062
1063 // Set up this file descriptor for listening
1064 listenSocket(s);
1065}
1066
1067/**
1068 * Takes a socket created by listenSocket() and sets various options on it
1069 * to prepare for use in the server.
1070 */
1071void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +00001072 // Set socket to nonblocking mode
1073 int flags;
Mark Slee79b16942007-11-26 19:05:29 +00001074 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
1075 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
1076 close(s);
1077 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +00001078 }
1079
1080 int one = 1;
1081 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +00001082
1083 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +00001084 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001085
1086 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +00001087 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +00001088
1089 // Set TCP nodelay if available, MAC OS X Hack
1090 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
1091 #ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +00001092 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001093 #endif
1094
David Reiss1c20c872010-03-09 05:20:14 +00001095 #ifdef TCP_LOW_MIN_RTO
1096 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +00001097 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +00001098 }
1099 #endif
1100
Mark Slee79b16942007-11-26 19:05:29 +00001101 if (listen(s, LISTEN_BACKLOG) == -1) {
1102 close(s);
1103 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +00001104 }
1105
Mark Slee79b16942007-11-26 19:05:29 +00001106 // Cool, this socket is good to go, set it as the serverSocket_
1107 serverSocket_ = s;
1108}
1109
David Reiss068f4162010-03-09 05:19:45 +00001110void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
1111 threadManager_ = threadManager;
1112 if (threadManager != NULL) {
1113 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
1114 threadPoolProcessing_ = true;
1115 } else {
1116 threadPoolProcessing_ = false;
1117 }
1118}
1119
David Reiss01fe1532010-03-09 05:19:25 +00001120bool TNonblockingServer::serverOverloaded() {
1121 size_t activeConnections = numTConnections_ - connectionStack_.size();
1122 if (numActiveProcessors_ > maxActiveProcessors_ ||
1123 activeConnections > maxConnections_) {
1124 if (!overloaded_) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001125 GlobalOutput.printf("TNonblockingServer: overload condition begun.");
David Reiss01fe1532010-03-09 05:19:25 +00001126 overloaded_ = true;
1127 }
1128 } else {
1129 if (overloaded_ &&
1130 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
1131 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001132 GlobalOutput.printf("TNonblockingServer: overload ended; "
1133 "%u dropped (%llu total)",
David Reiss01fe1532010-03-09 05:19:25 +00001134 nConnectionsDropped_, nTotalConnectionsDropped_);
1135 nConnectionsDropped_ = 0;
1136 overloaded_ = false;
1137 }
1138 }
1139
1140 return overloaded_;
1141}
1142
1143bool TNonblockingServer::drainPendingTask() {
1144 if (threadManager_) {
1145 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1146 if (task) {
1147 TConnection* connection =
1148 static_cast<TConnection::Task*>(task.get())->getTConnection();
1149 assert(connection && connection->getServer()
1150 && connection->getState() == APP_WAIT_TASK);
1151 connection->forceClose();
1152 return true;
1153 }
1154 }
1155 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001156}
1157
David Reiss068f4162010-03-09 05:19:45 +00001158void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
1159 TConnection* connection =
1160 static_cast<TConnection::Task*>(task.get())->getTConnection();
Jake Farrellb0d95602011-12-06 01:17:26 +00001161 assert(connection && connection->getServer() &&
1162 connection->getState() == APP_WAIT_TASK);
David Reiss068f4162010-03-09 05:19:45 +00001163 connection->forceClose();
1164}
1165
Jake Farrellb0d95602011-12-06 01:17:26 +00001166void TNonblockingServer::stop() {
1167 // Breaks the event loop in all threads so that they end ASAP.
Roger Meierd0cdecf2011-12-08 19:34:01 +00001168 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001169 ioThreads_[i]->stop();
1170 }
1171}
1172
Mark Slee79b16942007-11-26 19:05:29 +00001173/**
1174 * Main workhorse function, starts up the server listening on a port and
1175 * loops over the libevent handler.
1176 */
1177void TNonblockingServer::serve() {
Jake Farrellb0d95602011-12-06 01:17:26 +00001178 // init listen socket
1179 createAndListenOnSocket();
Mark Slee79b16942007-11-26 19:05:29 +00001180
Jake Farrellb0d95602011-12-06 01:17:26 +00001181 // set up the IO threads
1182 assert(ioThreads_.empty());
1183 if (!numIOThreads_) {
1184 numIOThreads_ = DEFAULT_IO_THREADS;
David Reiss01fe1532010-03-09 05:19:25 +00001185 }
1186
Roger Meierd0cdecf2011-12-08 19:34:01 +00001187 for (uint32_t id = 0; id < numIOThreads_; ++id) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001188 // the first IO thread also does the listening on server socket
1189 int listenFd = (id == 0 ? serverSocket_ : -1);
Mark Slee2f6404d2006-10-10 01:37:40 +00001190
Jake Farrellb0d95602011-12-06 01:17:26 +00001191 shared_ptr<TNonblockingIOThread> thread(
1192 new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
1193 ioThreads_.push_back(thread);
1194 }
1195
1196 // Notify handler of the preServe event
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001197 if (eventHandler_ != NULL) {
1198 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001199 }
1200
Jake Farrellb0d95602011-12-06 01:17:26 +00001201 // Start all of our helper IO threads. Note that the threads run forever,
1202 // only terminating if stop() is called.
1203 assert(ioThreads_.size() == numIOThreads_);
1204 assert(ioThreads_.size() > 0);
1205
1206 GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
1207 port_, ioThreads_.size());
1208
1209 // Launch all the secondary IO threads in separate threads
1210 if (ioThreads_.size() > 1) {
1211 ioThreadFactory_.reset(new PosixThreadFactory(
1212 PosixThreadFactory::OTHER, // scheduler
1213 PosixThreadFactory::NORMAL, // priority
1214 1, // stack size (MB)
1215 false // detached
1216 ));
1217
1218 assert(ioThreadFactory_.get());
1219
1220 // intentionally starting at thread 1, not 0
Roger Meierd0cdecf2011-12-08 19:34:01 +00001221 for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001222 shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1223 ioThreads_[i]->setThread(thread);
1224 thread->start();
1225 }
1226 }
1227
1228 // Run the primary (listener) IO thread loop in our main thread; this will
1229 // only return when the server is shutting down.
1230 ioThreads_[0]->run();
1231
1232 // Ensure all threads are finished before exiting serve()
Roger Meierd0cdecf2011-12-08 19:34:01 +00001233 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001234 ioThreads_[i]->join();
1235 GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1236 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001237}
1238
Jake Farrellb0d95602011-12-06 01:17:26 +00001239TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1240 int number,
1241 int listenSocket,
1242 bool useHighPriority)
1243 : server_(server)
1244 , number_(number)
1245 , listenSocket_(listenSocket)
1246 , useHighPriority_(useHighPriority)
1247 , eventBase_(NULL) {
1248 notificationPipeFDs_[0] = -1;
1249 notificationPipeFDs_[1] = -1;
1250}
1251
1252TNonblockingIOThread::~TNonblockingIOThread() {
1253 // make sure our associated thread is fully finished
1254 join();
1255
1256 if (eventBase_) {
1257 event_base_free(eventBase_);
Bryan Duxbury76c43682011-08-24 21:26:48 +00001258 }
1259
Jake Farrellb0d95602011-12-06 01:17:26 +00001260 if (listenSocket_ >= 0) {
1261 if (0 != close(listenSocket_)) {
1262 GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
1263 errno);
1264 }
1265 listenSocket_ = TNonblockingServer::INVALID_SOCKET;
1266 }
1267
1268 for (int i = 0; i < 2; ++i) {
1269 if (notificationPipeFDs_[i] >= 0) {
1270 if (0 != ::close(notificationPipeFDs_[i])) {
1271 GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
1272 errno);
1273 }
1274 notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET;
1275 }
1276 }
1277}
1278
1279void TNonblockingIOThread::createNotificationPipe() {
1280 if (pipe(notificationPipeFDs_) != 0) {
1281 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
1282 throw TException("can't create notification pipe");
1283 }
1284 int flags;
1285 if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
1286 fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
1287 close(notificationPipeFDs_[0]);
1288 close(notificationPipeFDs_[1]);
1289 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
1290 }
1291 for (int i = 0; i < 2; ++i) {
1292 if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
1293 fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
1294 close(notificationPipeFDs_[0]);
1295 close(notificationPipeFDs_[1]);
1296 throw TException("TNonblockingServer::createNotificationPipe() "
1297 "FD_CLOEXEC");
1298 }
1299 }
1300}
1301
1302/**
1303 * Register the core libevent events onto the proper base.
1304 */
1305void TNonblockingIOThread::registerEvents() {
1306 if (listenSocket_ >= 0) {
1307 // Register the server event
1308 event_set(&serverEvent_,
1309 listenSocket_,
1310 EV_READ | EV_PERSIST,
1311 TNonblockingIOThread::listenHandler,
1312 server_);
1313 event_base_set(eventBase_, &serverEvent_);
1314
1315 // Add the event and start up the server
1316 if (-1 == event_add(&serverEvent_, 0)) {
1317 throw TException("TNonblockingServer::serve(): "
1318 "event_add() failed on server listen event");
1319 }
1320 GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
1321 number_);
1322 }
1323
1324 createNotificationPipe();
1325
1326 // Create an event to be notified when a task finishes
1327 event_set(&notificationEvent_,
1328 getNotificationRecvFD(),
1329 EV_READ | EV_PERSIST,
1330 TNonblockingIOThread::notifyHandler,
1331 this);
1332
1333 // Attach to the base
1334 event_base_set(eventBase_, &notificationEvent_);
1335
1336 // Add the event and start up the server
1337 if (-1 == event_add(&notificationEvent_, 0)) {
1338 throw TException("TNonblockingServer::serve(): "
1339 "event_add() failed on task-done notification event");
1340 }
1341 GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
1342 number_);
1343}
1344
1345bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
1346 int fd = getNotificationSendFD();
1347 if (fd < 0) {
1348 return false;
1349 }
1350
1351 const int kSize = sizeof(conn);
1352 if (write(fd, &conn, kSize) != kSize) {
1353 return false;
1354 }
1355
1356 return true;
1357}
1358
1359/* static */
1360void TNonblockingIOThread::notifyHandler(int fd, short which, void* v) {
1361 TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
1362 assert(ioThread);
Roger Meierd0cdecf2011-12-08 19:34:01 +00001363 (void)which;
Jake Farrellb0d95602011-12-06 01:17:26 +00001364
1365 while (true) {
1366 TNonblockingServer::TConnection* connection = 0;
1367 const int kSize = sizeof(connection);
1368 int nBytes = read(fd, &connection, kSize);
1369 if (nBytes == kSize) {
1370 if (connection == NULL) {
1371 // this is the command to stop our thread, exit the handler!
1372 return;
1373 }
1374 connection->transition();
1375 } else if (nBytes > 0) {
1376 // throw away these bytes and hope that next time we get a solid read
1377 GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
1378 nBytes, kSize);
1379 ioThread->breakLoop(true);
1380 return;
1381 } else if (nBytes == 0) {
1382 GlobalOutput.printf("notifyHandler: Notify socket closed!");
1383 // exit the loop
1384 break;
1385 } else { // nBytes < 0
1386 if (errno != EWOULDBLOCK && errno != EAGAIN) {
1387 GlobalOutput.perror(
1388 "TNonblocking: notifyHandler read() failed: ", errno);
1389 ioThread->breakLoop(true);
1390 return;
1391 }
1392 // exit the loop
1393 break;
1394 }
1395 }
1396}
1397
1398void TNonblockingIOThread::breakLoop(bool error) {
1399 if (error) {
1400 GlobalOutput.printf(
1401 "TNonblockingServer: IO thread #%d exiting with error.", number_);
1402 // TODO: figure out something better to do here, but for now kill the
1403 // whole process.
1404 GlobalOutput.printf("TNonblockingServer: aborting process.");
1405 ::abort();
1406 }
1407
1408 // sets a flag so that the loop exits on the next event
Bryan Duxbury76c43682011-08-24 21:26:48 +00001409 event_base_loopbreak(eventBase_);
1410
Jake Farrellb0d95602011-12-06 01:17:26 +00001411 // event_base_loopbreak() only causes the loop to exit the next time
1412 // it wakes up. We need to force it to wake up, in case there are
1413 // no real events it needs to process.
Bryan Duxbury76c43682011-08-24 21:26:48 +00001414 //
Jake Farrellb0d95602011-12-06 01:17:26 +00001415 // If we're running in the same thread, we can't use the notify(0)
1416 // mechanism to stop the thread, but happily if we're running in the
1417 // same thread, this means the thread can't be blocking in the event
1418 // loop either.
1419 if (!pthread_equal(pthread_self(), threadId_)) {
1420 notify(NULL);
1421 }
1422}
1423
1424void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
1425 // Start out with a standard, low-priority setup for the sched params.
1426 struct sched_param sp;
1427 bzero((void*) &sp, sizeof(sp));
1428 int policy = SCHED_OTHER;
1429
1430 // If desired, set up high-priority sched params structure.
1431 if (value) {
1432 // FIFO scheduler, ranked above default SCHED_OTHER queue
1433 policy = SCHED_FIFO;
1434 // The priority only compares us to other SCHED_FIFO threads, so we
1435 // just pick a random priority halfway between min & max.
1436 const int priority = (sched_get_priority_max(policy) +
1437 sched_get_priority_min(policy)) / 2;
1438
1439 sp.sched_priority = priority;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001440 }
1441
Jake Farrellb0d95602011-12-06 01:17:26 +00001442 // Actually set the sched params for the current thread.
1443 if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
1444 GlobalOutput.printf(
1445 "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
1446 } else {
1447 GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno);
1448 }
1449}
Bryan Duxbury76c43682011-08-24 21:26:48 +00001450
Jake Farrellb0d95602011-12-06 01:17:26 +00001451void TNonblockingIOThread::run() {
1452 threadId_ = pthread_self();
1453
1454 assert(eventBase_ == 0);
1455 eventBase_ = event_base_new();
1456
1457 // Print some libevent stats
1458 if (number_ == 0) {
1459 GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
1460 event_get_version(),
1461 event_base_get_method(eventBase_));
1462 }
1463
1464
1465 registerEvents();
1466
1467 GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
1468 number_);
1469
1470 if (useHighPriority_) {
1471 setCurrentThreadHighPriority(true);
1472 }
1473
1474 // Run libevent engine, never returns, invokes calls to eventHandler
1475 event_base_loop(eventBase_, 0);
1476
1477 if (useHighPriority_) {
1478 setCurrentThreadHighPriority(false);
1479 }
1480
1481 // cleans up our registered events
1482 cleanupEvents();
1483
1484 GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
1485 number_);
1486}
1487
1488void TNonblockingIOThread::cleanupEvents() {
1489 // stop the listen socket, if any
1490 if (listenSocket_ >= 0) {
1491 if (event_del(&serverEvent_) == -1) {
1492 GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", errno);
1493 }
1494 }
1495
1496 event_del(&notificationEvent_);
1497}
1498
1499
1500void TNonblockingIOThread::stop() {
1501 // This should cause the thread to fall out of its event loop ASAP.
1502 breakLoop(false);
1503}
1504
1505void TNonblockingIOThread::join() {
1506 // If this was a thread created by a factory (not the thread that called
1507 // serve()), we join() it to make sure we shut down fully.
1508 if (thread_) {
1509 try {
1510 // Note that it is safe to both join() ourselves twice, as well as join
1511 // the current thread as the pthread implementation checks for deadlock.
1512 thread_->join();
1513 } catch(...) {
1514 // swallow everything
1515 }
1516 }
Bryan Duxbury76c43682011-08-24 21:26:48 +00001517}
1518
T Jake Lucianib5e62212009-01-31 22:36:20 +00001519}}} // apache::thrift::server