blob: 86a96c67382ffa239d3158b37242b31966841943 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meier3781c242011-12-11 20:07:21 +000020#define __STDC_FORMAT_MACROS
21
Konrad Grochowski9be4e682013-06-22 22:03:31 +020022#include <thrift/thrift-config.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000023
Roger Meier4285ba22013-06-10 21:17:23 +020024#include <thrift/server/TNonblockingServer.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000025#include <thrift/concurrency/Exception.h>
26#include <thrift/transport/TSocket.h>
27#include <thrift/concurrency/PlatformThreadFactory.h>
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040028#include <thrift/transport/PlatformSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000029
Mark Sleee02385b2007-06-09 01:21:16 +000030#include <iostream>
Roger Meier30aae0c2011-07-08 12:23:31 +000031
32#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000033#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000034#endif
35
36#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000037#include <netinet/in.h>
38#include <netinet/tcp.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000039#endif
40
41#ifdef HAVE_ARPA_INET_H
Bryan Duxbury76c43682011-08-24 21:26:48 +000042#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000043#endif
44
45#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000046#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000047#endif
48
Roger Meier2fa9c312011-09-05 19:15:53 +000049#ifdef HAVE_FCNTL_H
Mark Slee2f6404d2006-10-10 01:37:40 +000050#include <fcntl.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000051#endif
52
Mark Slee2f6404d2006-10-10 01:37:40 +000053#include <assert.h>
Roger Meier12d70532011-12-14 23:35:28 +000054
55#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +000056#include <sched.h>
Roger Meier12d70532011-12-14 23:35:28 +000057#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000058
David Reiss9b903442009-10-21 05:51:28 +000059#ifndef AF_LOCAL
60#define AF_LOCAL AF_UNIX
61#endif
62
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040063#if !defined(PRIu32)
Roger Meier12d70532011-12-14 23:35:28 +000064#define PRIu32 "I32u"
Roger Meierf2b094f2013-06-04 22:09:37 +020065#define PRIu64 "I64u"
Roger Meier12d70532011-12-14 23:35:28 +000066#endif
67
T Jake Lucianib5e62212009-01-31 22:36:20 +000068namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000069
T Jake Lucianib5e62212009-01-31 22:36:20 +000070using namespace apache::thrift::protocol;
71using namespace apache::thrift::transport;
72using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000073using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000074using apache::thrift::transport::TSocket;
75using apache::thrift::transport::TTransportException;
Jake Farrellb0d95602011-12-06 01:17:26 +000076using boost::shared_ptr;
Mark Sleee02385b2007-06-09 01:21:16 +000077
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000078/// Three states for sockets: recv frame size, recv data, and send mode
79enum TSocketState {
80 SOCKET_RECV_FRAMING,
81 SOCKET_RECV,
82 SOCKET_SEND
83};
84
85/**
86 * Five states for the nonblocking server:
87 * 1) initialize
88 * 2) read 4 byte frame size
89 * 3) read frame of data
90 * 4) send back data (if any)
91 * 5) force immediate connection close
92 */
93enum TAppState {
94 APP_INIT,
95 APP_READ_FRAME_SIZE,
96 APP_READ_REQUEST,
97 APP_WAIT_TASK,
98 APP_SEND_RESULT,
99 APP_CLOSE_CONNECTION
100};
101
102/**
103 * Represents a connection that is handled via libevent. This connection
104 * essentially encapsulates a socket that has some associated libevent state.
105 */
106class TNonblockingServer::TConnection {
107 private:
Jake Farrellb0d95602011-12-06 01:17:26 +0000108 /// Server IO Thread handling this connection
109 TNonblockingIOThread* ioThread_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000110
111 /// Server handle
112 TNonblockingServer* server_;
113
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000114 /// TProcessor
115 boost::shared_ptr<TProcessor> processor_;
116
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000117 /// Object wrapping network socket
118 boost::shared_ptr<TSocket> tSocket_;
119
120 /// Libevent object
121 struct event event_;
122
123 /// Libevent flags
124 short eventFlags_;
125
126 /// Socket mode
127 TSocketState socketState_;
128
129 /// Application state
130 TAppState appState_;
131
132 /// How much data needed to read
133 uint32_t readWant_;
134
135 /// Where in the read buffer are we
136 uint32_t readBufferPos_;
137
138 /// Read buffer
139 uint8_t* readBuffer_;
140
141 /// Read buffer size
142 uint32_t readBufferSize_;
143
144 /// Write buffer
145 uint8_t* writeBuffer_;
146
147 /// Write buffer size
148 uint32_t writeBufferSize_;
149
150 /// How far through writing are we?
151 uint32_t writeBufferPos_;
152
153 /// Largest size of write buffer seen since buffer was constructed
154 size_t largestWriteBufferSize_;
155
156 /// Count of the number of calls for use with getResizeBufferEveryN().
157 int32_t callsForResize_;
158
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000159 /// Transport to read from
160 boost::shared_ptr<TMemoryBuffer> inputTransport_;
161
162 /// Transport that processor writes to
163 boost::shared_ptr<TMemoryBuffer> outputTransport_;
164
165 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
166 boost::shared_ptr<TTransport> factoryInputTransport_;
167 boost::shared_ptr<TTransport> factoryOutputTransport_;
168
169 /// Protocol decoder
170 boost::shared_ptr<TProtocol> inputProtocol_;
171
172 /// Protocol encoder
173 boost::shared_ptr<TProtocol> outputProtocol_;
174
175 /// Server event handler, if any
176 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
177
178 /// Thrift call context, if any
179 void *connectionContext_;
180
181 /// Go into read mode
182 void setRead() {
183 setFlags(EV_READ | EV_PERSIST);
184 }
185
186 /// Go into write mode
187 void setWrite() {
188 setFlags(EV_WRITE | EV_PERSIST);
189 }
190
191 /// Set socket idle
192 void setIdle() {
193 setFlags(0);
194 }
195
196 /**
197 * Set event flags for this connection.
198 *
199 * @param eventFlags flags we pass to libevent for the connection.
200 */
201 void setFlags(short eventFlags);
202
203 /**
204 * Libevent handler called (via our static wrapper) when the connection
205 * socket had something happen. Rather than use the flags libevent passed,
206 * we use the connection state to determine whether we need to read or
207 * write the socket.
208 */
209 void workSocket();
210
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000211 public:
212
213 class Task;
214
215 /// Constructor
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400216 TConnection(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000217 const sockaddr* addr, socklen_t addrLen) {
218 readBuffer_ = NULL;
219 readBufferSize_ = 0;
220
Jake Farrellb0d95602011-12-06 01:17:26 +0000221 ioThread_ = ioThread;
222 server_ = ioThread->getServer();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000223
Jake Farrellb0d95602011-12-06 01:17:26 +0000224 // Allocate input and output transports these only need to be allocated
225 // once per TConnection (they don't need to be reallocated on init() call)
226 inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400227 outputTransport_.reset(
228 new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
Jake Farrellb0d95602011-12-06 01:17:26 +0000229 tSocket_.reset(new TSocket());
230 init(socket, ioThread, addr, addrLen);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000231 }
232
233 ~TConnection() {
234 std::free(readBuffer_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000235 }
236
Roger Meier0c04fcc2013-03-22 19:52:08 +0100237 /// Close this connection and free or reset its resources.
238 void close();
239
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000240 /**
241 * Check buffers against any size limits and shrink it if exceeded.
242 *
243 * @param readLimit we reduce read buffer size to this (if nonzero).
244 * @param writeLimit if nonzero and write buffer is larger, replace it.
245 */
246 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
247
248 /// Initialize
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400249 void init(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000250 const sockaddr* addr, socklen_t addrLen);
251
252 /**
253 * This is called when the application transitions from one state into
254 * another. This means that it has finished writing the data that it needed
255 * to, or finished receiving the data that it needed to.
256 */
257 void transition();
258
259 /**
260 * C-callable event handler for connection events. Provides a callback
261 * that libevent can understand which invokes connection_->workSocket().
262 *
263 * @param fd the descriptor the event occurred on.
264 * @param which the flags associated with the event.
265 * @param v void* callback arg where we placed TConnection's "this".
266 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000267 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Konrad Grochowskib7af66e2014-07-08 19:22:44 +0200268 assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000269 ((TConnection*)v)->workSocket();
270 }
271
272 /**
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000273 * Notification to server that processing has ended on this request.
274 * Can be called either when processing is completed or when a waiting
275 * task has been preemptively terminated (on overload).
276 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000277 * Don't call this from the IO thread itself.
278 *
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400279 * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000280 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000281 bool notifyIOThread() {
282 return ioThread_->notify(this);
283 }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000284
Jake Farrellb0d95602011-12-06 01:17:26 +0000285 /*
286 * Returns the number of this connection's currently assigned IO
287 * thread.
288 */
289 int getIOThreadNumber() const {
290 return ioThread_->getThreadNumber();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000291 }
292
293 /// Force connection shutdown for this connection.
294 void forceClose() {
295 appState_ = APP_CLOSE_CONNECTION;
Jake Farrellb0d95602011-12-06 01:17:26 +0000296 if (!notifyIOThread()) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000297 throw TException("TConnection::forceClose: failed write on notify pipe");
298 }
299 }
300
301 /// return the server this connection was initialized for.
Jake Farrellb0d95602011-12-06 01:17:26 +0000302 TNonblockingServer* getServer() const {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000303 return server_;
304 }
305
306 /// get state of connection.
Jake Farrellb0d95602011-12-06 01:17:26 +0000307 TAppState getState() const {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000308 return appState_;
309 }
310
311 /// return the TSocket transport wrapping this network connection
312 boost::shared_ptr<TSocket> getTSocket() const {
313 return tSocket_;
314 }
315
316 /// return the server event handler if any
317 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
318 return serverEventHandler_;
319 }
320
321 /// return the Thrift connection context if any
322 void* getConnectionContext() {
323 return connectionContext_;
324 }
325
326};
327
328class TNonblockingServer::TConnection::Task: public Runnable {
Mark Sleee02385b2007-06-09 01:21:16 +0000329 public:
330 Task(boost::shared_ptr<TProcessor> processor,
331 boost::shared_ptr<TProtocol> input,
332 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +0000333 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +0000334 processor_(processor),
335 input_(input),
336 output_(output),
David Reiss105961d2010-10-06 17:10:17 +0000337 connection_(connection),
338 serverEventHandler_(connection_->getServerEventHandler()),
339 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000340
341 void run() {
342 try {
David Reiss105961d2010-10-06 17:10:17 +0000343 for (;;) {
Roger Meier72957452013-06-29 00:28:50 +0200344 if (serverEventHandler_) {
David Reiss105961d2010-10-06 17:10:17 +0000345 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
346 }
347 if (!processor_->process(input_, output_, connectionContext_) ||
348 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000349 break;
350 }
351 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000352 } catch (const TTransportException& ttx) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000353 GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
Bryan Duxbury1e987582011-08-25 17:33:03 +0000354 } catch (const bad_alloc&) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000355 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
Henrique Mendonca962b3532012-09-20 13:19:55 +0000356 exit(1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000357 } catch (const std::exception& x) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000358 GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
Bryan Duxbury1e987582011-08-25 17:33:03 +0000359 typeid(x).name(), x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000360 } catch (...) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000361 GlobalOutput.printf(
362 "TNonblockingServer: unknown exception while processing.");
Mark Sleee02385b2007-06-09 01:21:16 +0000363 }
Mark Slee79b16942007-11-26 19:05:29 +0000364
David Reiss01fe1532010-03-09 05:19:25 +0000365 // Signal completion back to the libevent thread via a pipe
Jake Farrellb0d95602011-12-06 01:17:26 +0000366 if (!connection_->notifyIOThread()) {
David Reiss01fe1532010-03-09 05:19:25 +0000367 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000368 }
David Reiss01fe1532010-03-09 05:19:25 +0000369 }
370
371 TConnection* getTConnection() {
372 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +0000373 }
374
375 private:
376 boost::shared_ptr<TProcessor> processor_;
377 boost::shared_ptr<TProtocol> input_;
378 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000379 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000380 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
381 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000382};
Mark Slee5ea15f92007-03-05 22:55:59 +0000383
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400384void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
Jake Farrellb0d95602011-12-06 01:17:26 +0000385 TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000386 const sockaddr* addr,
387 socklen_t addrLen) {
David Reiss105961d2010-10-06 17:10:17 +0000388 tSocket_->setSocketFD(socket);
389 tSocket_->setCachedAddress(addr, addrLen);
390
Jake Farrellb0d95602011-12-06 01:17:26 +0000391 ioThread_ = ioThread;
392 server_ = ioThread->getServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000393 appState_ = APP_INIT;
394 eventFlags_ = 0;
395
396 readBufferPos_ = 0;
397 readWant_ = 0;
398
399 writeBuffer_ = NULL;
400 writeBufferSize_ = 0;
401 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000402 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000403
David Reiss89a12942010-10-06 17:10:52 +0000404 socketState_ = SOCKET_RECV_FRAMING;
David Reiss54bec5d2010-10-06 17:10:45 +0000405 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000406
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000407 // get input/transports
Jake Farrellb0d95602011-12-06 01:17:26 +0000408 factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
409 inputTransport_);
410 factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
411 outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000412
413 // Create protocol
Jake Farrellb0d95602011-12-06 01:17:26 +0000414 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
415 factoryInputTransport_);
416 outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
417 factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000418
419 // Set up for any server event handler
420 serverEventHandler_ = server_->getEventHandler();
Roger Meier72957452013-06-29 00:28:50 +0200421 if (serverEventHandler_) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000422 connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
423 outputProtocol_);
David Reiss105961d2010-10-06 17:10:17 +0000424 } else {
425 connectionContext_ = NULL;
426 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000427
428 // Get the processor
Jake Farrellb0d95602011-12-06 01:17:26 +0000429 processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000430}
431
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000432void TNonblockingServer::TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000433 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000434 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000435
436 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000437 case SOCKET_RECV_FRAMING:
438 union {
439 uint8_t buf[sizeof(uint32_t)];
Roger Meier3781c242011-12-11 20:07:21 +0000440 uint32_t size;
David Reiss89a12942010-10-06 17:10:52 +0000441 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000442
David Reiss89a12942010-10-06 17:10:52 +0000443 // if we've already received some bytes we kept them here
444 framing.size = readWant_;
445 // determine size of this frame
446 try {
447 // Read from the socket
448 fetch = tSocket_->read(&framing.buf[readBufferPos_],
449 uint32_t(sizeof(framing.size) - readBufferPos_));
450 if (fetch == 0) {
451 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000452 close();
453 return;
454 }
David Reiss89a12942010-10-06 17:10:52 +0000455 readBufferPos_ += fetch;
456 } catch (TTransportException& te) {
457 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
458 close();
459
460 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000461 }
462
David Reiss89a12942010-10-06 17:10:52 +0000463 if (readBufferPos_ < sizeof(framing.size)) {
464 // more needed before frame size is known -- save what we have so far
465 readWant_ = framing.size;
466 return;
467 }
468
469 readWant_ = ntohl(framing.size);
Roger Meier3781c242011-12-11 20:07:21 +0000470 if (readWant_ > server_->getMaxFrameSize()) {
471 // Don't allow giant frame sizes. This prevents bad clients from
472 // causing us to try and allocate a giant buffer.
473 GlobalOutput.printf("TNonblockingServer: frame size too large "
Roger Meier42cb8732013-06-28 22:49:14 +0200474 "(%" PRIu32 " > %" PRIu64 ") from client %s. "
Roger Meierf2b094f2013-06-04 22:09:37 +0200475 "Remote side not using TFramedTransport?",
476 readWant_,
477 (uint64_t)server_->getMaxFrameSize(),
Roger Meier3781c242011-12-11 20:07:21 +0000478 tSocket_->getSocketInfo().c_str());
David Reiss89a12942010-10-06 17:10:52 +0000479 close();
480 return;
481 }
482 // size known; now get the rest of the frame
483 transition();
484 return;
485
486 case SOCKET_RECV:
487 // It is an error to be in this state if we already have all the data
488 assert(readBufferPos_ < readWant_);
489
David Reiss105961d2010-10-06 17:10:17 +0000490 try {
491 // Read from the socket
492 fetch = readWant_ - readBufferPos_;
493 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
494 }
495 catch (TTransportException& te) {
496 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
497 close();
Mark Slee79b16942007-11-26 19:05:29 +0000498
David Reiss105961d2010-10-06 17:10:17 +0000499 return;
500 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000501
Mark Slee2f6404d2006-10-10 01:37:40 +0000502 if (got > 0) {
503 // Move along in the buffer
504 readBufferPos_ += got;
505
506 // Check that we did not overdo it
507 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000508
Mark Slee2f6404d2006-10-10 01:37:40 +0000509 // We are done reading, move onto the next state
510 if (readBufferPos_ == readWant_) {
511 transition();
512 }
513 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000514 }
515
516 // Whenever we get down here it means a remote disconnect
517 close();
Mark Slee79b16942007-11-26 19:05:29 +0000518
Mark Slee2f6404d2006-10-10 01:37:40 +0000519 return;
520
521 case SOCKET_SEND:
522 // Should never have position past size
523 assert(writeBufferPos_ <= writeBufferSize_);
524
525 // If there is no data to send, then let us move on
526 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000527 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000528 transition();
529 return;
530 }
531
David Reiss105961d2010-10-06 17:10:17 +0000532 try {
533 left = writeBufferSize_ - writeBufferPos_;
534 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
535 }
536 catch (TTransportException& te) {
537 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000538 close();
539 return;
540 }
541
542 writeBufferPos_ += sent;
543
544 // Did we overdo it?
545 assert(writeBufferPos_ <= writeBufferSize_);
546
Mark Slee79b16942007-11-26 19:05:29 +0000547 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000548 if (writeBufferPos_ == writeBufferSize_) {
549 transition();
550 }
551
552 return;
553
554 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000555 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000556 assert(0);
557 }
558}
559
560/**
561 * This is called when the application transitions from one state into
562 * another. This means that it has finished writing the data that it needed
563 * to, or finished receiving the data that it needed to.
564 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000565void TNonblockingServer::TConnection::transition() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000566 // ensure this connection is active right now
567 assert(ioThread_);
568 assert(server_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000569
Mark Slee2f6404d2006-10-10 01:37:40 +0000570 // Switch upon the state that we are currently in and move to a new state
571 switch (appState_) {
572
573 case APP_READ_REQUEST:
574 // We are done reading the request, package the read buffer into transport
575 // and get back some data from the dispatch function
576 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000577 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000578 // Prepend four bytes of blank space to the buffer so we can
579 // write the frame size there later.
580 outputTransport_->getWritePtr(4);
581 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000582
David Reiss01fe1532010-03-09 05:19:25 +0000583 server_->incrementActiveProcessors();
584
Mark Sleee02385b2007-06-09 01:21:16 +0000585 if (server_->isThreadPoolProcessing()) {
586 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000587
David Reiss01fe1532010-03-09 05:19:25 +0000588 // Create task and dispatch to the thread manager
589 boost::shared_ptr<Runnable> task =
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000590 boost::shared_ptr<Runnable>(new Task(processor_,
David Reiss01fe1532010-03-09 05:19:25 +0000591 inputProtocol_,
592 outputProtocol_,
593 this));
594 // The application is now waiting on the task to finish
595 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000596
David Reisse11f3072008-10-07 21:39:19 +0000597 try {
598 server_->addTask(task);
599 } catch (IllegalStateException & ise) {
600 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000601 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000602 close();
603 }
Mark Slee402ee282007-08-23 01:43:20 +0000604
David Reiss01fe1532010-03-09 05:19:25 +0000605 // Set this connection idle so that libevent doesn't process more
606 // data on it while we're still waiting for the threadmanager to
607 // finish this task
608 setIdle();
609 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000610 } else {
611 try {
Roger Meier72957452013-06-29 00:28:50 +0200612 if (serverEventHandler_) {
613 serverEventHandler_->processContext(connectionContext_,
614 getTSocket());
615 }
Mark Sleee02385b2007-06-09 01:21:16 +0000616 // Invoke the processor
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000617 processor_->process(inputProtocol_, outputProtocol_,
618 connectionContext_);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000619 } catch (const TTransportException &ttx) {
620 GlobalOutput.printf("TNonblockingServer transport error in "
621 "process(): %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000622 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000623 close();
624 return;
Bryan Duxbury1e987582011-08-25 17:33:03 +0000625 } catch (const std::exception &x) {
626 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
627 typeid(x).name(), x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000628 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000629 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000630 return;
631 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000632 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000633 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000634 close();
635 return;
636 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000637 }
638
Mark Slee402ee282007-08-23 01:43:20 +0000639 // Intentionally fall through here, the call to process has written into
640 // the writeBuffer_
641
Mark Sleee02385b2007-06-09 01:21:16 +0000642 case APP_WAIT_TASK:
643 // We have now finished processing a task and the result has been written
644 // into the outputTransport_, so we grab its contents and place them into
645 // the writeBuffer_ for actual writing by the libevent thread
646
David Reiss01fe1532010-03-09 05:19:25 +0000647 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000648 // Get the result of the operation
649 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
650
651 // If the function call generated return data, then move into the send
652 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000653 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000654 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000655
656 // Move into write state
657 writeBufferPos_ = 0;
658 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000659
David Reissaf787782008-07-03 20:29:34 +0000660 // Put the frame size into the write buffer
661 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
662 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000663
664 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000665 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000666 setWrite();
667
668 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000669 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000670
671 return;
672 }
673
David Reissc51986f2009-03-24 20:01:25 +0000674 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000675 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000676 goto LABEL_APP_INIT;
677
Mark Slee2f6404d2006-10-10 01:37:40 +0000678 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000679 // it's now safe to perform buffer size housekeeping.
680 if (writeBufferSize_ > largestWriteBufferSize_) {
681 largestWriteBufferSize_ = writeBufferSize_;
682 }
683 if (server_->getResizeBufferEveryN() > 0
684 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
685 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
686 server_->getIdleWriteBufferLimit());
687 callsForResize_ = 0;
688 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000689
690 // N.B.: We also intentionally fall through here into the INIT state!
691
Mark Slee92f00fb2006-10-25 01:28:17 +0000692 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000693 case APP_INIT:
694
695 // Clear write buffer variables
696 writeBuffer_ = NULL;
697 writeBufferPos_ = 0;
698 writeBufferSize_ = 0;
699
Mark Slee2f6404d2006-10-10 01:37:40 +0000700 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000701 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000702 appState_ = APP_READ_FRAME_SIZE;
703
David Reiss89a12942010-10-06 17:10:52 +0000704 readBufferPos_ = 0;
705
Mark Slee2f6404d2006-10-10 01:37:40 +0000706 // Register read event
707 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000708
Mark Slee2f6404d2006-10-10 01:37:40 +0000709 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000710 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000711
712 return;
713
714 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000715 // We just read the request length
716 // Double the buffer size until it is big enough
717 if (readWant_ > readBufferSize_) {
718 if (readBufferSize_ == 0) {
719 readBufferSize_ = 1;
720 }
721 uint32_t newSize = readBufferSize_;
722 while (readWant_ > newSize) {
723 newSize *= 2;
724 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000725
David Reiss89a12942010-10-06 17:10:52 +0000726 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
727 if (newBuffer == NULL) {
728 // nothing else to be done...
729 throw std::bad_alloc();
730 }
731 readBuffer_ = newBuffer;
732 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000733 }
734
Mark Slee2f6404d2006-10-10 01:37:40 +0000735 readBufferPos_= 0;
736
737 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000738 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000739 appState_ = APP_READ_REQUEST;
740
741 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000742 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000743
744 return;
745
David Reiss01fe1532010-03-09 05:19:25 +0000746 case APP_CLOSE_CONNECTION:
747 server_->decrementActiveProcessors();
748 close();
749 return;
750
Mark Slee2f6404d2006-10-10 01:37:40 +0000751 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000752 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000753 assert(0);
754 }
755}
756
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000757void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000758 // Catch the do nothing case
759 if (eventFlags_ == eventFlags) {
760 return;
761 }
762
763 // Delete a previously existing event
764 if (eventFlags_ != 0) {
765 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000766 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000767 return;
768 }
769 }
770
771 // Update in memory structure
772 eventFlags_ = eventFlags;
773
Mark Slee402ee282007-08-23 01:43:20 +0000774 // Do not call event_set if there are no flags
775 if (!eventFlags_) {
776 return;
777 }
778
David Reiss01fe1532010-03-09 05:19:25 +0000779 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000780 * event_set:
781 *
782 * Prepares the event structure &event to be used in future calls to
783 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000784 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000785 *
786 * The events can be either EV_READ, EV_WRITE, or both, indicating
787 * that an application can read or write from the file respectively without
788 * blocking.
789 *
Mark Sleee02385b2007-06-09 01:21:16 +0000790 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000791 * the event and the type of event which will be one of: EV_TIMEOUT,
792 * EV_SIGNAL, EV_READ, EV_WRITE.
793 *
794 * The additional flag EV_PERSIST makes an event_add() persistent until
795 * event_del() has been called.
796 *
797 * Once initialized, the &event struct can be used repeatedly with
798 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000799 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000800 * when an ev structure has been added to libevent using event_add() the
801 * structure must persist until the event occurs (assuming EV_PERSIST
802 * is not set) or is removed using event_del(). You may not reuse the same
803 * ev structure for multiple monitored descriptors; each descriptor needs
804 * its own ev.
805 */
David Reiss105961d2010-10-06 17:10:17 +0000806 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
807 TConnection::eventHandler, this);
Jake Farrellb0d95602011-12-06 01:17:26 +0000808 event_base_set(ioThread_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000809
810 // Add the event
811 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000812 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000813 }
814}
815
816/**
817 * Closes a connection
818 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000819void TNonblockingServer::TConnection::close() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000820 // Delete the registered libevent
821 if (event_del(&event_) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400822 GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR);
David Reiss105961d2010-10-06 17:10:17 +0000823 }
824
Roger Meier72957452013-06-29 00:28:50 +0200825 if (serverEventHandler_) {
David Reiss105961d2010-10-06 17:10:17 +0000826 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000827 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000828 ioThread_ = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000829
830 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000831 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000832
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000833 // close any factory produced transports
834 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000835 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000836
Roger Meier464a3a42014-07-07 21:48:28 +0200837 // release processor and handler
838 processor_.reset();
839
Mark Slee2f6404d2006-10-10 01:37:40 +0000840 // Give this object back to the server that owns it
841 server_->returnConnection(this);
842}
843
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000844void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
845 size_t readLimit,
846 size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000847 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000848 free(readBuffer_);
849 readBuffer_ = NULL;
850 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000851 }
David Reiss54bec5d2010-10-06 17:10:45 +0000852
853 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
854 // just start over
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400855 outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
David Reiss54bec5d2010-10-06 17:10:45 +0000856 largestWriteBufferSize_ = 0;
857 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000858}
859
David Reiss8ede8182010-09-02 15:26:28 +0000860TNonblockingServer::~TNonblockingServer() {
Roger Meier0c04fcc2013-03-22 19:52:08 +0100861 // Close any active connections (moves them to the idle connection stack)
862 while (activeConnections_.size()) {
863 activeConnections_.front()->close();
864 }
David Reiss8ede8182010-09-02 15:26:28 +0000865 // Clean up unused TConnection objects in connectionStack_
866 while (!connectionStack_.empty()) {
867 TConnection* connection = connectionStack_.top();
868 connectionStack_.pop();
869 delete connection;
870 }
Roger Meier0c04fcc2013-03-22 19:52:08 +0100871 // The TNonblockingIOThread objects have shared_ptrs to the Thread
872 // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
873 // objects (as runnable) so these objects will never deallocate without help.
874 while (!ioThreads_.empty()) {
875 boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
876 ioThreads_.pop_back();
877 iot->setThread(boost::shared_ptr<Thread>());
878 }
David Reiss8ede8182010-09-02 15:26:28 +0000879}
880
Mark Slee2f6404d2006-10-10 01:37:40 +0000881/**
882 * Creates a new connection either by reusing an object off the stack or
883 * by allocating a new one entirely
884 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000885TNonblockingServer::TConnection* TNonblockingServer::createConnection(
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400886 THRIFT_SOCKET socket, const sockaddr* addr, socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000887 // Check the stack
Jake Farrellb0d95602011-12-06 01:17:26 +0000888 Guard g(connMutex_);
889
890 // pick an IO thread to handle this connection -- currently round robin
Jake Farrellb0d95602011-12-06 01:17:26 +0000891 assert(nextIOThread_ < ioThreads_.size());
892 int selectedThreadIdx = nextIOThread_;
Ben Craig64935232013-10-09 15:21:38 -0500893 nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size());
Jake Farrellb0d95602011-12-06 01:17:26 +0000894
895 TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
896
897 // Check the connection stack to see if we can re-use
898 TConnection* result = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000899 if (connectionStack_.empty()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000900 result = new TConnection(socket, ioThread, addr, addrLen);
901 ++numTConnections_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000902 } else {
Jake Farrellb0d95602011-12-06 01:17:26 +0000903 result = connectionStack_.top();
Mark Slee2f6404d2006-10-10 01:37:40 +0000904 connectionStack_.pop();
Jake Farrellb0d95602011-12-06 01:17:26 +0000905 result->init(socket, ioThread, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000906 }
Roger Meier0c04fcc2013-03-22 19:52:08 +0100907 activeConnections_.push_back(result);
Jake Farrellb0d95602011-12-06 01:17:26 +0000908 return result;
Mark Slee2f6404d2006-10-10 01:37:40 +0000909}
910
911/**
912 * Returns a connection to the stack
913 */
914void TNonblockingServer::returnConnection(TConnection* connection) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000915 Guard g(connMutex_);
916
Roger Meier0c04fcc2013-03-22 19:52:08 +0100917 activeConnections_.erase(std::remove(activeConnections_.begin(), activeConnections_.end(), connection), activeConnections_.end());
918
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000919 if (connectionStackLimit_ &&
920 (connectionStack_.size() >= connectionStackLimit_)) {
921 delete connection;
Jake Farrellb0d95602011-12-06 01:17:26 +0000922 --numTConnections_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000923 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000924 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000925 connectionStack_.push(connection);
926 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000927}
928
929/**
David Reissa79e4882008-03-05 07:51:47 +0000930 * Server socket had something happen. We accept all waiting client
931 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000932 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400933void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
Roger Meier3b771a12010-11-17 22:11:26 +0000934 (void) which;
David Reiss3bb5e052010-01-25 19:31:31 +0000935 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000936 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000937
Mark Slee2f6404d2006-10-10 01:37:40 +0000938 // Server socket accepted a new connection
939 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000940 sockaddr_storage addrStorage;
941 sockaddr* addrp = (sockaddr*)&addrStorage;
942 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000943
Mark Slee2f6404d2006-10-10 01:37:40 +0000944 // Going to accept a new client socket
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400945 THRIFT_SOCKET clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000946
Mark Slee2f6404d2006-10-10 01:37:40 +0000947 // Accept as many new clients as possible, even though libevent signaled only
948 // one, this helps us to avoid having to go back into the libevent engine so
949 // many times
David Reiss105961d2010-10-06 17:10:17 +0000950 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000951 // If we're overloaded, take action here
952 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000953 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000954 nConnectionsDropped_++;
955 nTotalConnectionsDropped_++;
956 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400957 ::THRIFT_CLOSESOCKET(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000958 return;
David Reiss01fe1532010-03-09 05:19:25 +0000959 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
960 if (!drainPendingTask()) {
961 // Nothing left to discard, so we drop connection instead.
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400962 ::THRIFT_CLOSESOCKET(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000963 return;
David Reiss01fe1532010-03-09 05:19:25 +0000964 }
965 }
966 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000967
Mark Slee2f6404d2006-10-10 01:37:40 +0000968 // Explicitly set this socket to NONBLOCK mode
969 int flags;
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400970 if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0 ||
971 THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
972 GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ", THRIFT_GET_SOCKET_ERROR);
973 ::THRIFT_CLOSESOCKET(clientSocket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000974 return;
975 }
976
977 // Create a new TConnection for this client socket.
978 TConnection* clientConnection =
Jake Farrellb0d95602011-12-06 01:17:26 +0000979 createConnection(clientSocket, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000980
981 // Fail fast if we could not create a TConnection object
982 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000983 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400984 ::THRIFT_CLOSESOCKET(clientSocket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000985 return;
986 }
987
Jake Farrellb0d95602011-12-06 01:17:26 +0000988 /*
989 * Either notify the ioThread that is assigned this connection to
990 * start processing, or if it is us, we'll just ask this
991 * connection to do its initial state change here.
992 *
993 * (We need to avoid writing to our own notification pipe, to
994 * avoid possible deadlocks if the pipe is full.)
995 *
996 * The IO thread #0 is the only one that handles these listen
997 * events, so unless the connection has been assigned to thread #0
998 * we know it's not on our thread.
999 */
1000 if (clientConnection->getIOThreadNumber() == 0) {
1001 clientConnection->transition();
1002 } else {
1003 clientConnection->notifyIOThread();
1004 }
David Reiss3e7fca42009-09-19 01:59:13 +00001005
1006 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +00001007 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +00001008 }
Mark Slee79b16942007-11-26 19:05:29 +00001009
Jake Farrellb0d95602011-12-06 01:17:26 +00001010
Mark Slee2f6404d2006-10-10 01:37:40 +00001011 // Done looping accept, now we have to make sure the error is due to
1012 // blocking. Any other error is a problem
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001013 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
1014 GlobalOutput.perror("thriftServerEventHandler: accept() ", THRIFT_GET_SOCKET_ERROR);
Mark Slee2f6404d2006-10-10 01:37:40 +00001015 }
1016}
1017
1018/**
Mark Slee79b16942007-11-26 19:05:29 +00001019 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +00001020 */
Jake Farrellb0d95602011-12-06 01:17:26 +00001021void TNonblockingServer::createAndListenOnSocket() {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001022 THRIFT_SOCKET s;
Jake Farrellb0d95602011-12-06 01:17:26 +00001023
Mark Sleefb4b5142007-11-20 01:27:08 +00001024 struct addrinfo hints, *res, *res0;
1025 int error;
Mark Slee79b16942007-11-26 19:05:29 +00001026
Mark Sleefb4b5142007-11-20 01:27:08 +00001027 char port[sizeof("65536") + 1];
1028 memset(&hints, 0, sizeof(hints));
1029 hints.ai_family = PF_UNSPEC;
1030 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +00001031 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +00001032 sprintf(port, "%d", port_);
1033
1034 // Wildcard address
1035 error = getaddrinfo(NULL, port, &hints, &res0);
1036 if (error) {
Roger Meierd8f50f32012-04-11 21:48:56 +00001037 throw TException("TNonblockingServer::serve() getaddrinfo " +
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001038 string(THRIFT_GAI_STRERROR(error)));
Mark Sleefb4b5142007-11-20 01:27:08 +00001039 }
1040
1041 // Pick the ipv6 address first since ipv4 addresses can be mapped
1042 // into ipv6 space.
1043 for (res = res0; res; res = res->ai_next) {
1044 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
1045 break;
1046 }
1047
Mark Slee2f6404d2006-10-10 01:37:40 +00001048 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +00001049 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
1050 if (s == -1) {
1051 freeaddrinfo(res0);
1052 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +00001053 }
1054
David Reiss13aea462008-06-10 22:56:04 +00001055 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +00001056 if (res->ai_family == AF_INET6) {
1057 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +00001058 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +00001059 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
1060 }
David Reiss13aea462008-06-10 22:56:04 +00001061 }
1062 #endif // #ifdef IPV6_V6ONLY
1063
1064
Mark Slee79b16942007-11-26 19:05:29 +00001065 int one = 1;
1066
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001067 // Set THRIFT_NO_SOCKET_CACHING to avoid 2MSL delay on server restart
1068 setsockopt(s, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +00001069
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001070 if (::bind(s, res->ai_addr, static_cast<int>(res->ai_addrlen)) == -1) {
1071 ::THRIFT_CLOSESOCKET(s);
Mark Slee79b16942007-11-26 19:05:29 +00001072 freeaddrinfo(res0);
Roger Meierd8f50f32012-04-11 21:48:56 +00001073 throw TTransportException(TTransportException::NOT_OPEN,
1074 "TNonblockingServer::serve() bind",
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001075 THRIFT_GET_SOCKET_ERROR);
Mark Slee79b16942007-11-26 19:05:29 +00001076 }
1077
1078 // Done with the addr info
1079 freeaddrinfo(res0);
1080
1081 // Set up this file descriptor for listening
1082 listenSocket(s);
1083}
1084
1085/**
1086 * Takes a socket created by listenSocket() and sets various options on it
1087 * to prepare for use in the server.
1088 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001089void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
Mark Slee2f6404d2006-10-10 01:37:40 +00001090 // Set socket to nonblocking mode
1091 int flags;
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001092 if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0 ||
1093 THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
1094 ::THRIFT_CLOSESOCKET(s);
1095 throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +00001096 }
1097
1098 int one = 1;
1099 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +00001100
1101 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +00001102 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001103
1104 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +00001105 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +00001106
1107 // Set TCP nodelay if available, MAC OS X Hack
1108 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
1109 #ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +00001110 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001111 #endif
1112
David Reiss1c20c872010-03-09 05:20:14 +00001113 #ifdef TCP_LOW_MIN_RTO
1114 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +00001115 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +00001116 }
1117 #endif
1118
Mark Slee79b16942007-11-26 19:05:29 +00001119 if (listen(s, LISTEN_BACKLOG) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001120 ::THRIFT_CLOSESOCKET(s);
Mark Slee79b16942007-11-26 19:05:29 +00001121 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +00001122 }
1123
Mark Slee79b16942007-11-26 19:05:29 +00001124 // Cool, this socket is good to go, set it as the serverSocket_
1125 serverSocket_ = s;
1126}
1127
David Reiss068f4162010-03-09 05:19:45 +00001128void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
1129 threadManager_ = threadManager;
Roger Meier72957452013-06-29 00:28:50 +02001130 if (threadManager) {
Roger Meier82525772012-11-16 00:38:27 +00001131 threadManager->setExpireCallback(apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose, this, apache::thrift::stdcxx::placeholders::_1));
David Reiss068f4162010-03-09 05:19:45 +00001132 threadPoolProcessing_ = true;
1133 } else {
1134 threadPoolProcessing_ = false;
1135 }
1136}
1137
David Reiss01fe1532010-03-09 05:19:25 +00001138bool TNonblockingServer::serverOverloaded() {
1139 size_t activeConnections = numTConnections_ - connectionStack_.size();
1140 if (numActiveProcessors_ > maxActiveProcessors_ ||
1141 activeConnections > maxConnections_) {
1142 if (!overloaded_) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001143 GlobalOutput.printf("TNonblockingServer: overload condition begun.");
David Reiss01fe1532010-03-09 05:19:25 +00001144 overloaded_ = true;
1145 }
1146 } else {
1147 if (overloaded_ &&
1148 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
1149 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001150 GlobalOutput.printf("TNonblockingServer: overload ended; "
1151 "%u dropped (%llu total)",
David Reiss01fe1532010-03-09 05:19:25 +00001152 nConnectionsDropped_, nTotalConnectionsDropped_);
1153 nConnectionsDropped_ = 0;
1154 overloaded_ = false;
1155 }
1156 }
1157
1158 return overloaded_;
1159}
1160
1161bool TNonblockingServer::drainPendingTask() {
1162 if (threadManager_) {
1163 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1164 if (task) {
1165 TConnection* connection =
1166 static_cast<TConnection::Task*>(task.get())->getTConnection();
1167 assert(connection && connection->getServer()
1168 && connection->getState() == APP_WAIT_TASK);
1169 connection->forceClose();
1170 return true;
1171 }
1172 }
1173 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001174}
1175
David Reiss068f4162010-03-09 05:19:45 +00001176void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
1177 TConnection* connection =
1178 static_cast<TConnection::Task*>(task.get())->getTConnection();
Jake Farrellb0d95602011-12-06 01:17:26 +00001179 assert(connection && connection->getServer() &&
1180 connection->getState() == APP_WAIT_TASK);
David Reiss068f4162010-03-09 05:19:45 +00001181 connection->forceClose();
1182}
1183
Jake Farrellb0d95602011-12-06 01:17:26 +00001184void TNonblockingServer::stop() {
1185 // Breaks the event loop in all threads so that they end ASAP.
Roger Meierd0cdecf2011-12-08 19:34:01 +00001186 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001187 ioThreads_[i]->stop();
1188 }
1189}
1190
Roger Meier6f2a5032013-07-08 23:35:25 +02001191void TNonblockingServer::registerEvents(event_base* user_event_base) {
1192 userEventBase_ = user_event_base;
1193
Jake Farrellb0d95602011-12-06 01:17:26 +00001194 // init listen socket
Roger Meiere802aa42013-07-19 21:10:54 +02001195 if (serverSocket_ == THRIFT_INVALID_SOCKET)
Roger Meier6f2a5032013-07-08 23:35:25 +02001196 createAndListenOnSocket();
Mark Slee79b16942007-11-26 19:05:29 +00001197
Jake Farrellb0d95602011-12-06 01:17:26 +00001198 // set up the IO threads
1199 assert(ioThreads_.empty());
1200 if (!numIOThreads_) {
1201 numIOThreads_ = DEFAULT_IO_THREADS;
David Reiss01fe1532010-03-09 05:19:25 +00001202 }
1203
Roger Meierd0cdecf2011-12-08 19:34:01 +00001204 for (uint32_t id = 0; id < numIOThreads_; ++id) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001205 // the first IO thread also does the listening on server socket
Roger Meier0be9ffa2013-07-19 21:10:01 +02001206 THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
Mark Slee2f6404d2006-10-10 01:37:40 +00001207
Jake Farrellb0d95602011-12-06 01:17:26 +00001208 shared_ptr<TNonblockingIOThread> thread(
1209 new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
1210 ioThreads_.push_back(thread);
1211 }
1212
1213 // Notify handler of the preServe event
Roger Meier72957452013-06-29 00:28:50 +02001214 if (eventHandler_) {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001215 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001216 }
1217
Jake Farrellb0d95602011-12-06 01:17:26 +00001218 // Start all of our helper IO threads. Note that the threads run forever,
1219 // only terminating if stop() is called.
1220 assert(ioThreads_.size() == numIOThreads_);
1221 assert(ioThreads_.size() > 0);
1222
1223 GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
1224 port_, ioThreads_.size());
1225
1226 // Launch all the secondary IO threads in separate threads
1227 if (ioThreads_.size() > 1) {
Roger Meier12d70532011-12-14 23:35:28 +00001228 ioThreadFactory_.reset(new PlatformThreadFactory(
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001229#if !defined(USE_BOOST_THREAD) && !defined(USE_STD_THREAD)
Roger Meier12d70532011-12-14 23:35:28 +00001230 PlatformThreadFactory::OTHER, // scheduler
1231 PlatformThreadFactory::NORMAL, // priority
Jake Farrellb0d95602011-12-06 01:17:26 +00001232 1, // stack size (MB)
Roger Meier12d70532011-12-14 23:35:28 +00001233#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001234 false // detached
1235 ));
1236
1237 assert(ioThreadFactory_.get());
1238
1239 // intentionally starting at thread 1, not 0
Roger Meierd0cdecf2011-12-08 19:34:01 +00001240 for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001241 shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1242 ioThreads_[i]->setThread(thread);
1243 thread->start();
1244 }
1245 }
1246
Roger Meier6f2a5032013-07-08 23:35:25 +02001247 // Register the events for the primary (listener) IO thread
1248 ioThreads_[0]->registerEvents();
1249}
1250
1251/**
1252 * Main workhorse function, starts up the server listening on a port and
1253 * loops over the libevent handler.
1254 */
1255void TNonblockingServer::serve() {
1256
1257 registerEvents(NULL);
1258
Jake Farrellb0d95602011-12-06 01:17:26 +00001259 // Run the primary (listener) IO thread loop in our main thread; this will
1260 // only return when the server is shutting down.
1261 ioThreads_[0]->run();
1262
1263 // Ensure all threads are finished before exiting serve()
Roger Meierd0cdecf2011-12-08 19:34:01 +00001264 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001265 ioThreads_[i]->join();
1266 GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1267 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001268}
1269
Jake Farrellb0d95602011-12-06 01:17:26 +00001270TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1271 int number,
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001272 THRIFT_SOCKET listenSocket,
Jake Farrellb0d95602011-12-06 01:17:26 +00001273 bool useHighPriority)
1274 : server_(server)
1275 , number_(number)
1276 , listenSocket_(listenSocket)
1277 , useHighPriority_(useHighPriority)
Roger Meier6f2a5032013-07-08 23:35:25 +02001278 , eventBase_(NULL)
1279 , ownEventBase_(false) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001280 notificationPipeFDs_[0] = -1;
1281 notificationPipeFDs_[1] = -1;
1282}
1283
1284TNonblockingIOThread::~TNonblockingIOThread() {
1285 // make sure our associated thread is fully finished
1286 join();
1287
Roger Meier6f2a5032013-07-08 23:35:25 +02001288 if (eventBase_ && ownEventBase_) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001289 event_base_free(eventBase_);
Roger Meier6f2a5032013-07-08 23:35:25 +02001290 ownEventBase_ = false;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001291 }
1292
Jake Farrellb0d95602011-12-06 01:17:26 +00001293 if (listenSocket_ >= 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001294 if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001295 GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001296 THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001297 }
Roger Meier0be9ffa2013-07-19 21:10:01 +02001298 listenSocket_ = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +00001299 }
1300
1301 for (int i = 0; i < 2; ++i) {
1302 if (notificationPipeFDs_[i] >= 0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001303 if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFDs_[i])) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001304 GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001305 THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001306 }
Roger Meier0be9ffa2013-07-19 21:10:01 +02001307 notificationPipeFDs_[i] = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +00001308 }
1309 }
1310}
1311
1312void TNonblockingIOThread::createNotificationPipe() {
Roger Meier12d70532011-12-14 23:35:28 +00001313 if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
1314 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
Jake Farrellb0d95602011-12-06 01:17:26 +00001315 throw TException("can't create notification pipe");
1316 }
Roger Meier12d70532011-12-14 23:35:28 +00001317 if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
1318 evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001319 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1320 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
1321 throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
Jake Farrellb0d95602011-12-06 01:17:26 +00001322 }
1323 for (int i = 0; i < 2; ++i) {
Roger Meier12d70532011-12-14 23:35:28 +00001324#if LIBEVENT_VERSION_NUMBER < 0x02000000
1325 int flags;
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001326 if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
1327 THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
Roger Meier12d70532011-12-14 23:35:28 +00001328#else
1329 if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
1330#endif
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001331 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1332 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
Jake Farrellb0d95602011-12-06 01:17:26 +00001333 throw TException("TNonblockingServer::createNotificationPipe() "
1334 "FD_CLOEXEC");
1335 }
1336 }
1337}
1338
1339/**
1340 * Register the core libevent events onto the proper base.
1341 */
1342void TNonblockingIOThread::registerEvents() {
Roger Meier6f2a5032013-07-08 23:35:25 +02001343 threadId_ = Thread::get_current();
1344
1345 assert(eventBase_ == 0);
1346 eventBase_ = getServer()->getUserEventBase();
1347 if (eventBase_ == NULL) {
1348 eventBase_ = event_base_new();
1349 ownEventBase_ = true;
1350 }
1351
1352 // Print some libevent stats
1353 if (number_ == 0) {
1354 GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
1355 event_get_version(),
1356 event_base_get_method(eventBase_));
1357 }
1358
Jake Farrellb0d95602011-12-06 01:17:26 +00001359 if (listenSocket_ >= 0) {
1360 // Register the server event
1361 event_set(&serverEvent_,
1362 listenSocket_,
1363 EV_READ | EV_PERSIST,
1364 TNonblockingIOThread::listenHandler,
1365 server_);
1366 event_base_set(eventBase_, &serverEvent_);
1367
1368 // Add the event and start up the server
1369 if (-1 == event_add(&serverEvent_, 0)) {
1370 throw TException("TNonblockingServer::serve(): "
1371 "event_add() failed on server listen event");
1372 }
1373 GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
1374 number_);
1375 }
1376
1377 createNotificationPipe();
1378
1379 // Create an event to be notified when a task finishes
1380 event_set(&notificationEvent_,
1381 getNotificationRecvFD(),
1382 EV_READ | EV_PERSIST,
1383 TNonblockingIOThread::notifyHandler,
1384 this);
1385
1386 // Attach to the base
1387 event_base_set(eventBase_, &notificationEvent_);
1388
1389 // Add the event and start up the server
1390 if (-1 == event_add(&notificationEvent_, 0)) {
1391 throw TException("TNonblockingServer::serve(): "
1392 "event_add() failed on task-done notification event");
1393 }
1394 GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
1395 number_);
1396}
1397
1398bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001399 THRIFT_SOCKET fd = getNotificationSendFD();
Jake Farrellb0d95602011-12-06 01:17:26 +00001400 if (fd < 0) {
1401 return false;
1402 }
1403
1404 const int kSize = sizeof(conn);
Roger Meier12d70532011-12-14 23:35:28 +00001405 if (send(fd, const_cast_sockopt(&conn), kSize, 0) != kSize) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001406 return false;
1407 }
1408
1409 return true;
1410}
1411
1412/* static */
Roger Meier12d70532011-12-14 23:35:28 +00001413void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001414 TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
1415 assert(ioThread);
Roger Meierd0cdecf2011-12-08 19:34:01 +00001416 (void)which;
Jake Farrellb0d95602011-12-06 01:17:26 +00001417
1418 while (true) {
1419 TNonblockingServer::TConnection* connection = 0;
1420 const int kSize = sizeof(connection);
Ben Craig64935232013-10-09 15:21:38 -05001421 long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
Jake Farrellb0d95602011-12-06 01:17:26 +00001422 if (nBytes == kSize) {
1423 if (connection == NULL) {
1424 // this is the command to stop our thread, exit the handler!
1425 return;
1426 }
1427 connection->transition();
1428 } else if (nBytes > 0) {
1429 // throw away these bytes and hope that next time we get a solid read
1430 GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
1431 nBytes, kSize);
1432 ioThread->breakLoop(true);
1433 return;
1434 } else if (nBytes == 0) {
1435 GlobalOutput.printf("notifyHandler: Notify socket closed!");
1436 // exit the loop
1437 break;
1438 } else { // nBytes < 0
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001439 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001440 GlobalOutput.perror(
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001441 "TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001442 ioThread->breakLoop(true);
1443 return;
1444 }
1445 // exit the loop
1446 break;
1447 }
1448 }
1449}
1450
1451void TNonblockingIOThread::breakLoop(bool error) {
1452 if (error) {
1453 GlobalOutput.printf(
1454 "TNonblockingServer: IO thread #%d exiting with error.", number_);
1455 // TODO: figure out something better to do here, but for now kill the
1456 // whole process.
1457 GlobalOutput.printf("TNonblockingServer: aborting process.");
1458 ::abort();
1459 }
1460
1461 // sets a flag so that the loop exits on the next event
Bryan Duxbury76c43682011-08-24 21:26:48 +00001462 event_base_loopbreak(eventBase_);
1463
Jake Farrellb0d95602011-12-06 01:17:26 +00001464 // event_base_loopbreak() only causes the loop to exit the next time
1465 // it wakes up. We need to force it to wake up, in case there are
1466 // no real events it needs to process.
Bryan Duxbury76c43682011-08-24 21:26:48 +00001467 //
Jake Farrellb0d95602011-12-06 01:17:26 +00001468 // If we're running in the same thread, we can't use the notify(0)
1469 // mechanism to stop the thread, but happily if we're running in the
1470 // same thread, this means the thread can't be blocking in the event
1471 // loop either.
Roger Meier12d70532011-12-14 23:35:28 +00001472 if (!Thread::is_current(threadId_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001473 notify(NULL);
1474 }
1475}
1476
1477void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
Roger Meier12d70532011-12-14 23:35:28 +00001478#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +00001479 // Start out with a standard, low-priority setup for the sched params.
1480 struct sched_param sp;
1481 bzero((void*) &sp, sizeof(sp));
1482 int policy = SCHED_OTHER;
1483
1484 // If desired, set up high-priority sched params structure.
1485 if (value) {
1486 // FIFO scheduler, ranked above default SCHED_OTHER queue
1487 policy = SCHED_FIFO;
1488 // The priority only compares us to other SCHED_FIFO threads, so we
1489 // just pick a random priority halfway between min & max.
1490 const int priority = (sched_get_priority_max(policy) +
1491 sched_get_priority_min(policy)) / 2;
1492
1493 sp.sched_priority = priority;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001494 }
1495
Jake Farrellb0d95602011-12-06 01:17:26 +00001496 // Actually set the sched params for the current thread.
1497 if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
1498 GlobalOutput.printf(
1499 "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
1500 } else {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001501 GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001502 }
Roger Meierd051ca02013-08-15 01:35:11 +02001503#else
1504 THRIFT_UNUSED_VARIABLE(value);
Roger Meier12d70532011-12-14 23:35:28 +00001505#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001506}
Bryan Duxbury76c43682011-08-24 21:26:48 +00001507
Jake Farrellb0d95602011-12-06 01:17:26 +00001508void TNonblockingIOThread::run() {
Roger Meier6f2a5032013-07-08 23:35:25 +02001509 if (eventBase_ == NULL)
1510 registerEvents();
Jake Farrellb0d95602011-12-06 01:17:26 +00001511
1512 GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
1513 number_);
1514
1515 if (useHighPriority_) {
1516 setCurrentThreadHighPriority(true);
1517 }
1518
1519 // Run libevent engine, never returns, invokes calls to eventHandler
1520 event_base_loop(eventBase_, 0);
1521
1522 if (useHighPriority_) {
1523 setCurrentThreadHighPriority(false);
1524 }
1525
1526 // cleans up our registered events
1527 cleanupEvents();
1528
1529 GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
1530 number_);
1531}
1532
1533void TNonblockingIOThread::cleanupEvents() {
1534 // stop the listen socket, if any
1535 if (listenSocket_ >= 0) {
1536 if (event_del(&serverEvent_) == -1) {
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -04001537 GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
Jake Farrellb0d95602011-12-06 01:17:26 +00001538 }
1539 }
1540
1541 event_del(&notificationEvent_);
1542}
1543
1544
1545void TNonblockingIOThread::stop() {
1546 // This should cause the thread to fall out of its event loop ASAP.
1547 breakLoop(false);
1548}
1549
1550void TNonblockingIOThread::join() {
1551 // If this was a thread created by a factory (not the thread that called
1552 // serve()), we join() it to make sure we shut down fully.
1553 if (thread_) {
1554 try {
1555 // Note that it is safe to both join() ourselves twice, as well as join
1556 // the current thread as the pthread implementation checks for deadlock.
1557 thread_->join();
1558 } catch(...) {
1559 // swallow everything
1560 }
1561 }
Bryan Duxbury76c43682011-08-24 21:26:48 +00001562}
1563
T Jake Lucianib5e62212009-01-31 22:36:20 +00001564}}} // apache::thrift::server