blob: 1bf4e68e50fd8f69dcebead7bd21f532adf505b5 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000022#include <transport/TSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000023
Mark Sleee02385b2007-06-09 01:21:16 +000024#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000025#include <sys/socket.h>
26#include <netinet/in.h>
27#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000028#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <fcntl.h>
30#include <errno.h>
31#include <assert.h>
32
David Reiss9b903442009-10-21 05:51:28 +000033#ifndef AF_LOCAL
34#define AF_LOCAL AF_UNIX
35#endif
36
T Jake Lucianib5e62212009-01-31 22:36:20 +000037namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000038
T Jake Lucianib5e62212009-01-31 22:36:20 +000039using namespace apache::thrift::protocol;
40using namespace apache::thrift::transport;
41using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000042using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000043using apache::thrift::transport::TSocket;
44using apache::thrift::transport::TTransportException;
Mark Sleee02385b2007-06-09 01:21:16 +000045
46class TConnection::Task: public Runnable {
47 public:
48 Task(boost::shared_ptr<TProcessor> processor,
49 boost::shared_ptr<TProtocol> input,
50 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +000051 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +000052 processor_(processor),
53 input_(input),
54 output_(output),
David Reiss105961d2010-10-06 17:10:17 +000055 connection_(connection),
56 serverEventHandler_(connection_->getServerEventHandler()),
57 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +000058
59 void run() {
60 try {
David Reiss105961d2010-10-06 17:10:17 +000061 for (;;) {
62 if (serverEventHandler_ != NULL) {
63 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
64 }
65 if (!processor_->process(input_, output_, connectionContext_) ||
66 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +000067 break;
68 }
69 }
70 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000071 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000072 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000073 cerr << "TNonblockingServer exception: " << x.what() << endl;
David Reiss28e88ec2010-03-09 05:19:27 +000074 } catch (bad_alloc&) {
75 cerr << "TNonblockingServer caught bad_alloc exception.";
76 exit(-1);
Mark Sleee02385b2007-06-09 01:21:16 +000077 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000078 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000079 }
Mark Slee79b16942007-11-26 19:05:29 +000080
David Reiss01fe1532010-03-09 05:19:25 +000081 // Signal completion back to the libevent thread via a pipe
82 if (!connection_->notifyServer()) {
83 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +000084 }
David Reiss01fe1532010-03-09 05:19:25 +000085 }
86
87 TConnection* getTConnection() {
88 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +000089 }
90
91 private:
92 boost::shared_ptr<TProcessor> processor_;
93 boost::shared_ptr<TProtocol> input_;
94 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +000095 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +000096 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
97 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +000098};
Mark Slee5ea15f92007-03-05 22:55:59 +000099
David Reiss105961d2010-10-06 17:10:17 +0000100void TConnection::init(int socket, short eventFlags, TNonblockingServer* s,
101 const sockaddr* addr, socklen_t addrLen) {
102 tSocket_->setSocketFD(socket);
103 tSocket_->setCachedAddress(addr, addrLen);
104
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 server_ = s;
106 appState_ = APP_INIT;
107 eventFlags_ = 0;
108
109 readBufferPos_ = 0;
110 readWant_ = 0;
111
112 writeBuffer_ = NULL;
113 writeBufferSize_ = 0;
114 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000115 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000116
117 socketState_ = SOCKET_RECV;
118 appState_ = APP_INIT;
David Reiss54bec5d2010-10-06 17:10:45 +0000119 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000120
Mark Slee2f6404d2006-10-10 01:37:40 +0000121 // Set flags, which also registers the event
122 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000123
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000124 // get input/transports
125 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
126 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000127
128 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000129 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
130 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000131
132 // Set up for any server event handler
133 serverEventHandler_ = server_->getEventHandler();
134 if (serverEventHandler_ != NULL) {
135 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
136 } else {
137 connectionContext_ = NULL;
138 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000139}
140
141void TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000142 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000143 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000144
145 switch (socketState_) {
146 case SOCKET_RECV:
147 // It is an error to be in this state if we already have all the data
148 assert(readBufferPos_ < readWant_);
149
Mark Slee2f6404d2006-10-10 01:37:40 +0000150 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000151 if (readWant_ > readBufferSize_) {
David Reiss472fffb2010-03-09 05:20:24 +0000152 uint32_t newSize = readBufferSize_;
153 while (readWant_ > newSize) {
154 newSize *= 2;
Mark Slee2f6404d2006-10-10 01:37:40 +0000155 }
David Reiss472fffb2010-03-09 05:20:24 +0000156 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
157 if (newBuffer == NULL) {
boz6ded7752007-06-05 22:41:18 +0000158 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000159 close();
160 return;
161 }
David Reiss472fffb2010-03-09 05:20:24 +0000162 readBuffer_ = newBuffer;
163 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000164 }
165
David Reiss105961d2010-10-06 17:10:17 +0000166 try {
167 // Read from the socket
168 fetch = readWant_ - readBufferPos_;
169 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
170 }
171 catch (TTransportException& te) {
172 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
173 close();
Mark Slee79b16942007-11-26 19:05:29 +0000174
David Reiss105961d2010-10-06 17:10:17 +0000175 return;
176 }
177
Mark Slee2f6404d2006-10-10 01:37:40 +0000178 if (got > 0) {
179 // Move along in the buffer
180 readBufferPos_ += got;
181
182 // Check that we did not overdo it
183 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000184
Mark Slee2f6404d2006-10-10 01:37:40 +0000185 // We are done reading, move onto the next state
186 if (readBufferPos_ == readWant_) {
187 transition();
188 }
189 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000190 }
191
192 // Whenever we get down here it means a remote disconnect
193 close();
Mark Slee79b16942007-11-26 19:05:29 +0000194
Mark Slee2f6404d2006-10-10 01:37:40 +0000195 return;
196
197 case SOCKET_SEND:
198 // Should never have position past size
199 assert(writeBufferPos_ <= writeBufferSize_);
200
201 // If there is no data to send, then let us move on
202 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000203 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000204 transition();
205 return;
206 }
207
David Reiss105961d2010-10-06 17:10:17 +0000208 try {
209 left = writeBufferSize_ - writeBufferPos_;
210 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
211 }
212 catch (TTransportException& te) {
213 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000214 close();
215 return;
216 }
217
218 writeBufferPos_ += sent;
219
220 // Did we overdo it?
221 assert(writeBufferPos_ <= writeBufferSize_);
222
Mark Slee79b16942007-11-26 19:05:29 +0000223 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000224 if (writeBufferPos_ == writeBufferSize_) {
225 transition();
226 }
227
228 return;
229
230 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000231 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000232 assert(0);
233 }
234}
235
236/**
237 * This is called when the application transitions from one state into
238 * another. This means that it has finished writing the data that it needed
239 * to, or finished receiving the data that it needed to.
240 */
241void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000242
243 int sz = 0;
244
Mark Slee2f6404d2006-10-10 01:37:40 +0000245 // Switch upon the state that we are currently in and move to a new state
246 switch (appState_) {
247
248 case APP_READ_REQUEST:
249 // We are done reading the request, package the read buffer into transport
250 // and get back some data from the dispatch function
251 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000252 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000253 // Prepend four bytes of blank space to the buffer so we can
254 // write the frame size there later.
255 outputTransport_->getWritePtr(4);
256 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000257
David Reiss01fe1532010-03-09 05:19:25 +0000258 server_->incrementActiveProcessors();
259
Mark Sleee02385b2007-06-09 01:21:16 +0000260 if (server_->isThreadPoolProcessing()) {
261 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000262
David Reiss01fe1532010-03-09 05:19:25 +0000263 // Create task and dispatch to the thread manager
264 boost::shared_ptr<Runnable> task =
265 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
266 inputProtocol_,
267 outputProtocol_,
268 this));
269 // The application is now waiting on the task to finish
270 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000271
David Reisse11f3072008-10-07 21:39:19 +0000272 try {
273 server_->addTask(task);
274 } catch (IllegalStateException & ise) {
275 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000276 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000277 close();
278 }
Mark Slee402ee282007-08-23 01:43:20 +0000279
David Reiss01fe1532010-03-09 05:19:25 +0000280 // Set this connection idle so that libevent doesn't process more
281 // data on it while we're still waiting for the threadmanager to
282 // finish this task
283 setIdle();
284 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000285 } else {
286 try {
287 // Invoke the processor
David Reiss23248712010-10-06 17:10:08 +0000288 server_->getProcessor()->process(inputProtocol_, outputProtocol_, NULL);
Mark Sleee02385b2007-06-09 01:21:16 +0000289 } catch (TTransportException &ttx) {
David Reiss01e55c12008-07-13 22:18:51 +0000290 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000291 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000292 close();
293 return;
294 } catch (TException &x) {
David Reiss01e55c12008-07-13 22:18:51 +0000295 GlobalOutput.printf("TException: Server::process() %s", x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000296 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000297 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000298 return;
299 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000300 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000301 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000302 close();
303 return;
304 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000305 }
306
Mark Slee402ee282007-08-23 01:43:20 +0000307 // Intentionally fall through here, the call to process has written into
308 // the writeBuffer_
309
Mark Sleee02385b2007-06-09 01:21:16 +0000310 case APP_WAIT_TASK:
311 // We have now finished processing a task and the result has been written
312 // into the outputTransport_, so we grab its contents and place them into
313 // the writeBuffer_ for actual writing by the libevent thread
314
David Reiss01fe1532010-03-09 05:19:25 +0000315 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000316 // Get the result of the operation
317 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
318
319 // If the function call generated return data, then move into the send
320 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000321 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000322 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000323
324 // Move into write state
325 writeBufferPos_ = 0;
326 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000327
David Reissaf787782008-07-03 20:29:34 +0000328 // Put the frame size into the write buffer
329 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
330 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000331
332 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000333 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000334 setWrite();
335
336 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000337 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000338
339 return;
340 }
341
David Reissc51986f2009-03-24 20:01:25 +0000342 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000343 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000344 goto LABEL_APP_INIT;
345
Mark Slee2f6404d2006-10-10 01:37:40 +0000346 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000347 // it's now safe to perform buffer size housekeeping.
348 if (writeBufferSize_ > largestWriteBufferSize_) {
349 largestWriteBufferSize_ = writeBufferSize_;
350 }
351 if (server_->getResizeBufferEveryN() > 0
352 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
353 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
354 server_->getIdleWriteBufferLimit());
355 callsForResize_ = 0;
356 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000357
358 // N.B.: We also intentionally fall through here into the INIT state!
359
Mark Slee92f00fb2006-10-25 01:28:17 +0000360 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000361 case APP_INIT:
362
363 // Clear write buffer variables
364 writeBuffer_ = NULL;
365 writeBufferPos_ = 0;
366 writeBufferSize_ = 0;
367
368 // Set up read buffer for getting 4 bytes
369 readBufferPos_ = 0;
370 readWant_ = 4;
371
372 // Into read4 state we go
373 socketState_ = SOCKET_RECV;
374 appState_ = APP_READ_FRAME_SIZE;
375
376 // Register read event
377 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000378
Mark Slee2f6404d2006-10-10 01:37:40 +0000379 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000380 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000381
382 return;
383
384 case APP_READ_FRAME_SIZE:
385 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000386 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000387 sz = (int32_t)ntohl(sz);
388
389 if (sz <= 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000390 GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
Mark Slee2f6404d2006-10-10 01:37:40 +0000391 close();
392 return;
393 }
394
395 // Reset the read buffer
396 readWant_ = (uint32_t)sz;
397 readBufferPos_= 0;
398
399 // Move into read request state
400 appState_ = APP_READ_REQUEST;
401
402 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000403 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000404
405 return;
406
David Reiss01fe1532010-03-09 05:19:25 +0000407 case APP_CLOSE_CONNECTION:
408 server_->decrementActiveProcessors();
409 close();
410 return;
411
Mark Slee2f6404d2006-10-10 01:37:40 +0000412 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000413 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000414 assert(0);
415 }
416}
417
418void TConnection::setFlags(short eventFlags) {
419 // Catch the do nothing case
420 if (eventFlags_ == eventFlags) {
421 return;
422 }
423
424 // Delete a previously existing event
425 if (eventFlags_ != 0) {
426 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000427 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000428 return;
429 }
430 }
431
432 // Update in memory structure
433 eventFlags_ = eventFlags;
434
Mark Slee402ee282007-08-23 01:43:20 +0000435 // Do not call event_set if there are no flags
436 if (!eventFlags_) {
437 return;
438 }
439
David Reiss01fe1532010-03-09 05:19:25 +0000440 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000441 * event_set:
442 *
443 * Prepares the event structure &event to be used in future calls to
444 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000445 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000446 *
447 * The events can be either EV_READ, EV_WRITE, or both, indicating
448 * that an application can read or write from the file respectively without
449 * blocking.
450 *
Mark Sleee02385b2007-06-09 01:21:16 +0000451 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000452 * the event and the type of event which will be one of: EV_TIMEOUT,
453 * EV_SIGNAL, EV_READ, EV_WRITE.
454 *
455 * The additional flag EV_PERSIST makes an event_add() persistent until
456 * event_del() has been called.
457 *
458 * Once initialized, the &event struct can be used repeatedly with
459 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000460 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000461 * when an ev structure has been added to libevent using event_add() the
462 * structure must persist until the event occurs (assuming EV_PERSIST
463 * is not set) or is removed using event_del(). You may not reuse the same
464 * ev structure for multiple monitored descriptors; each descriptor needs
465 * its own ev.
466 */
David Reiss105961d2010-10-06 17:10:17 +0000467 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
468 TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000469 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000470
471 // Add the event
472 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000473 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000474 }
475}
476
477/**
478 * Closes a connection
479 */
480void TConnection::close() {
481 // Delete the registered libevent
482 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000483 GlobalOutput.perror("TConnection::close() event_del", errno);
484 }
485
486 if (serverEventHandler_ != NULL) {
487 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000488 }
489
490 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000491 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000492
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000493 // close any factory produced transports
494 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000495 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000496
Mark Slee2f6404d2006-10-10 01:37:40 +0000497 // Give this object back to the server that owns it
498 server_->returnConnection(this);
499}
500
David Reiss54bec5d2010-10-06 17:10:45 +0000501void TConnection::checkIdleBufferMemLimit(size_t readLimit,
502 size_t writeLimit) {
503 if (readLimit > 0 && readBufferSize_ > readLimit) {
504 readBufferSize_ = readLimit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000505 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
506 if (readBuffer_ == NULL) {
507 GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
508 close();
509 }
510 }
David Reiss54bec5d2010-10-06 17:10:45 +0000511
512 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
513 // just start over
514 outputTransport_->resetBuffer(NULL, 0, TMemoryBuffer::TAKE_OWNERSHIP);
515 largestWriteBufferSize_ = 0;
516 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000517}
518
David Reiss8ede8182010-09-02 15:26:28 +0000519TNonblockingServer::~TNonblockingServer() {
520 // TODO: We currently leak any active TConnection objects.
521 // Since we're shutting down and destroying the event_base, the TConnection
522 // objects will never receive any additional callbacks. (And even if they
523 // did, it would be bad, since they keep a pointer around to the server,
524 // which is being destroyed.)
525
526 // Clean up unused TConnection objects in connectionStack_
527 while (!connectionStack_.empty()) {
528 TConnection* connection = connectionStack_.top();
529 connectionStack_.pop();
530 delete connection;
531 }
532
533 if (eventBase_) {
534 event_base_free(eventBase_);
535 }
536
537 if (serverSocket_ >= 0) {
538 close(serverSocket_);
539 }
540}
541
Mark Slee2f6404d2006-10-10 01:37:40 +0000542/**
543 * Creates a new connection either by reusing an object off the stack or
544 * by allocating a new one entirely
545 */
David Reiss105961d2010-10-06 17:10:17 +0000546TConnection* TNonblockingServer::createConnection(int socket, short flags,
547 const sockaddr* addr,
548 socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000549 // Check the stack
550 if (connectionStack_.empty()) {
David Reiss105961d2010-10-06 17:10:17 +0000551 return new TConnection(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000552 } else {
553 TConnection* result = connectionStack_.top();
554 connectionStack_.pop();
David Reiss105961d2010-10-06 17:10:17 +0000555 result->init(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000556 return result;
557 }
558}
559
560/**
561 * Returns a connection to the stack
562 */
563void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000564 if (connectionStackLimit_ &&
565 (connectionStack_.size() >= connectionStackLimit_)) {
566 delete connection;
567 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000568 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000569 connectionStack_.push(connection);
570 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000571}
572
573/**
David Reissa79e4882008-03-05 07:51:47 +0000574 * Server socket had something happen. We accept all waiting client
575 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000576 */
577void TNonblockingServer::handleEvent(int fd, short which) {
David Reiss3bb5e052010-01-25 19:31:31 +0000578 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000579 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000580
Mark Slee2f6404d2006-10-10 01:37:40 +0000581 // Server socket accepted a new connection
582 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000583 sockaddr_storage addrStorage;
584 sockaddr* addrp = (sockaddr*)&addrStorage;
585 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000586
Mark Slee2f6404d2006-10-10 01:37:40 +0000587 // Going to accept a new client socket
588 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000589
Mark Slee2f6404d2006-10-10 01:37:40 +0000590 // Accept as many new clients as possible, even though libevent signaled only
591 // one, this helps us to avoid having to go back into the libevent engine so
592 // many times
David Reiss105961d2010-10-06 17:10:17 +0000593 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000594 // If we're overloaded, take action here
595 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
596 nConnectionsDropped_++;
597 nTotalConnectionsDropped_++;
598 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
599 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000600 return;
David Reiss01fe1532010-03-09 05:19:25 +0000601 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
602 if (!drainPendingTask()) {
603 // Nothing left to discard, so we drop connection instead.
604 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000605 return;
David Reiss01fe1532010-03-09 05:19:25 +0000606 }
607 }
608 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000609 // Explicitly set this socket to NONBLOCK mode
610 int flags;
611 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
612 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000613 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000614 close(clientSocket);
615 return;
616 }
617
618 // Create a new TConnection for this client socket.
619 TConnection* clientConnection =
David Reiss105961d2010-10-06 17:10:17 +0000620 createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000621
622 // Fail fast if we could not create a TConnection object
623 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000624 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000625 close(clientSocket);
626 return;
627 }
628
629 // Put this client connection into the proper state
630 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000631
632 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000633 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000634 }
Mark Slee79b16942007-11-26 19:05:29 +0000635
Mark Slee2f6404d2006-10-10 01:37:40 +0000636 // Done looping accept, now we have to make sure the error is due to
637 // blocking. Any other error is a problem
638 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000639 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000640 }
641}
642
643/**
Mark Slee79b16942007-11-26 19:05:29 +0000644 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000645 */
Mark Slee79b16942007-11-26 19:05:29 +0000646void TNonblockingServer::listenSocket() {
647 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000648 struct addrinfo hints, *res, *res0;
649 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000650
Mark Sleefb4b5142007-11-20 01:27:08 +0000651 char port[sizeof("65536") + 1];
652 memset(&hints, 0, sizeof(hints));
653 hints.ai_family = PF_UNSPEC;
654 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000655 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000656 sprintf(port, "%d", port_);
657
658 // Wildcard address
659 error = getaddrinfo(NULL, port, &hints, &res0);
660 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000661 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
662 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000663 return;
664 }
665
666 // Pick the ipv6 address first since ipv4 addresses can be mapped
667 // into ipv6 space.
668 for (res = res0; res; res = res->ai_next) {
669 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
670 break;
671 }
672
Mark Slee2f6404d2006-10-10 01:37:40 +0000673 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000674 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
675 if (s == -1) {
676 freeaddrinfo(res0);
677 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000678 }
679
David Reiss13aea462008-06-10 22:56:04 +0000680 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +0000681 if (res->ai_family == AF_INET6) {
682 int zero = 0;
683 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
684 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
685 }
David Reiss13aea462008-06-10 22:56:04 +0000686 }
687 #endif // #ifdef IPV6_V6ONLY
688
689
Mark Slee79b16942007-11-26 19:05:29 +0000690 int one = 1;
691
692 // Set reuseaddr to avoid 2MSL delay on server restart
693 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
694
695 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
696 close(s);
697 freeaddrinfo(res0);
698 throw TException("TNonblockingServer::serve() bind");
699 }
700
701 // Done with the addr info
702 freeaddrinfo(res0);
703
704 // Set up this file descriptor for listening
705 listenSocket(s);
706}
707
708/**
709 * Takes a socket created by listenSocket() and sets various options on it
710 * to prepare for use in the server.
711 */
712void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000713 // Set socket to nonblocking mode
714 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000715 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
716 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
717 close(s);
718 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000719 }
720
721 int one = 1;
722 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000723
724 // Keepalive to ensure full result flushing
Mark Slee79b16942007-11-26 19:05:29 +0000725 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000726
727 // Turn linger off to avoid hung sockets
Mark Slee79b16942007-11-26 19:05:29 +0000728 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000729
730 // Set TCP nodelay if available, MAC OS X Hack
731 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
732 #ifndef TCP_NOPUSH
Mark Slee79b16942007-11-26 19:05:29 +0000733 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000734 #endif
735
David Reiss1c20c872010-03-09 05:20:14 +0000736 #ifdef TCP_LOW_MIN_RTO
737 if (TSocket::getUseLowMinRto()) {
738 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
739 }
740 #endif
741
Mark Slee79b16942007-11-26 19:05:29 +0000742 if (listen(s, LISTEN_BACKLOG) == -1) {
743 close(s);
744 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000745 }
746
Mark Slee79b16942007-11-26 19:05:29 +0000747 // Cool, this socket is good to go, set it as the serverSocket_
748 serverSocket_ = s;
749}
750
David Reiss01fe1532010-03-09 05:19:25 +0000751void TNonblockingServer::createNotificationPipe() {
752 if (pipe(notificationPipeFDs_) != 0) {
753 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
754 throw TException("can't create notification pipe");
755 }
David Reiss83b8fda2010-03-09 05:19:34 +0000756 int flags;
757 if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
758 fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
759 close(notificationPipeFDs_[0]);
760 close(notificationPipeFDs_[1]);
761 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
762 }
David Reiss01fe1532010-03-09 05:19:25 +0000763}
764
Mark Slee79b16942007-11-26 19:05:29 +0000765/**
766 * Register the core libevent events onto the proper base.
767 */
768void TNonblockingServer::registerEvents(event_base* base) {
769 assert(serverSocket_ != -1);
770 assert(!eventBase_);
771 eventBase_ = base;
772
773 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000774 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000775 event_get_version(),
776 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000777
778 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000779 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000780 serverSocket_,
781 EV_READ | EV_PERSIST,
782 TNonblockingServer::eventHandler,
783 this);
Mark Slee79b16942007-11-26 19:05:29 +0000784 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000785
786 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000787 if (-1 == event_add(&serverEvent_, 0)) {
788 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000789 }
David Reiss01fe1532010-03-09 05:19:25 +0000790 if (threadPoolProcessing_) {
791 // Create an event to be notified when a task finishes
792 event_set(&notificationEvent_,
793 getNotificationRecvFD(),
794 EV_READ | EV_PERSIST,
795 TConnection::taskHandler,
796 this);
David Reiss1c20c872010-03-09 05:20:14 +0000797
David Reiss01fe1532010-03-09 05:19:25 +0000798 // Attach to the base
799 event_base_set(eventBase_, &notificationEvent_);
800
801 // Add the event and start up the server
802 if (-1 == event_add(&notificationEvent_, 0)) {
803 throw TException("TNonblockingServer::serve(): notification event_add fail");
804 }
805 }
806}
807
David Reiss068f4162010-03-09 05:19:45 +0000808void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
809 threadManager_ = threadManager;
810 if (threadManager != NULL) {
811 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
812 threadPoolProcessing_ = true;
813 } else {
814 threadPoolProcessing_ = false;
815 }
816}
817
David Reiss01fe1532010-03-09 05:19:25 +0000818bool TNonblockingServer::serverOverloaded() {
819 size_t activeConnections = numTConnections_ - connectionStack_.size();
820 if (numActiveProcessors_ > maxActiveProcessors_ ||
821 activeConnections > maxConnections_) {
822 if (!overloaded_) {
823 GlobalOutput.printf("thrift non-blocking server overload condition");
824 overloaded_ = true;
825 }
826 } else {
827 if (overloaded_ &&
828 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
829 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
830 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
831 nConnectionsDropped_, nTotalConnectionsDropped_);
832 nConnectionsDropped_ = 0;
833 overloaded_ = false;
834 }
835 }
836
837 return overloaded_;
838}
839
840bool TNonblockingServer::drainPendingTask() {
841 if (threadManager_) {
842 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
843 if (task) {
844 TConnection* connection =
845 static_cast<TConnection::Task*>(task.get())->getTConnection();
846 assert(connection && connection->getServer()
847 && connection->getState() == APP_WAIT_TASK);
848 connection->forceClose();
849 return true;
850 }
851 }
852 return false;
Mark Slee79b16942007-11-26 19:05:29 +0000853}
854
David Reiss068f4162010-03-09 05:19:45 +0000855void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
856 TConnection* connection =
857 static_cast<TConnection::Task*>(task.get())->getTConnection();
858 assert(connection && connection->getServer()
859 && connection->getState() == APP_WAIT_TASK);
860 connection->forceClose();
861}
862
Mark Slee79b16942007-11-26 19:05:29 +0000863/**
864 * Main workhorse function, starts up the server listening on a port and
865 * loops over the libevent handler.
866 */
867void TNonblockingServer::serve() {
868 // Init socket
869 listenSocket();
870
David Reiss01fe1532010-03-09 05:19:25 +0000871 if (threadPoolProcessing_) {
872 // Init task completion notification pipe
873 createNotificationPipe();
874 }
875
Mark Slee79b16942007-11-26 19:05:29 +0000876 // Initialize libevent core
877 registerEvents(static_cast<event_base*>(event_init()));
Mark Slee2f6404d2006-10-10 01:37:40 +0000878
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000879 // Run the preServe event
880 if (eventHandler_ != NULL) {
881 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000882 }
883
Mark Sleee02385b2007-06-09 01:21:16 +0000884 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000885 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000886}
887
T Jake Lucianib5e62212009-01-31 22:36:20 +0000888}}} // apache::thrift::server