blob: 73edd93852ed56f80f8b09bc2f3068e5c06be151 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000022#include <transport/TSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000023
Mark Sleee02385b2007-06-09 01:21:16 +000024#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000025#include <sys/socket.h>
26#include <netinet/in.h>
27#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000028#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <fcntl.h>
30#include <errno.h>
31#include <assert.h>
32
David Reiss9b903442009-10-21 05:51:28 +000033#ifndef AF_LOCAL
34#define AF_LOCAL AF_UNIX
35#endif
36
T Jake Lucianib5e62212009-01-31 22:36:20 +000037namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000038
T Jake Lucianib5e62212009-01-31 22:36:20 +000039using namespace apache::thrift::protocol;
40using namespace apache::thrift::transport;
41using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000042using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000043using apache::thrift::transport::TSocket;
44using apache::thrift::transport::TTransportException;
Mark Sleee02385b2007-06-09 01:21:16 +000045
46class TConnection::Task: public Runnable {
47 public:
48 Task(boost::shared_ptr<TProcessor> processor,
49 boost::shared_ptr<TProtocol> input,
50 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +000051 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +000052 processor_(processor),
53 input_(input),
54 output_(output),
David Reiss105961d2010-10-06 17:10:17 +000055 connection_(connection),
56 serverEventHandler_(connection_->getServerEventHandler()),
57 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +000058
59 void run() {
60 try {
David Reiss105961d2010-10-06 17:10:17 +000061 for (;;) {
62 if (serverEventHandler_ != NULL) {
63 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
64 }
65 if (!processor_->process(input_, output_, connectionContext_) ||
66 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +000067 break;
68 }
69 }
70 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000071 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000072 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000073 cerr << "TNonblockingServer exception: " << x.what() << endl;
David Reiss28e88ec2010-03-09 05:19:27 +000074 } catch (bad_alloc&) {
75 cerr << "TNonblockingServer caught bad_alloc exception.";
76 exit(-1);
Mark Sleee02385b2007-06-09 01:21:16 +000077 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000078 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000079 }
Mark Slee79b16942007-11-26 19:05:29 +000080
David Reiss01fe1532010-03-09 05:19:25 +000081 // Signal completion back to the libevent thread via a pipe
82 if (!connection_->notifyServer()) {
83 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +000084 }
David Reiss01fe1532010-03-09 05:19:25 +000085 }
86
87 TConnection* getTConnection() {
88 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +000089 }
90
91 private:
92 boost::shared_ptr<TProcessor> processor_;
93 boost::shared_ptr<TProtocol> input_;
94 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +000095 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +000096 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
97 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +000098};
Mark Slee5ea15f92007-03-05 22:55:59 +000099
David Reiss105961d2010-10-06 17:10:17 +0000100void TConnection::init(int socket, short eventFlags, TNonblockingServer* s,
101 const sockaddr* addr, socklen_t addrLen) {
102 tSocket_->setSocketFD(socket);
103 tSocket_->setCachedAddress(addr, addrLen);
104
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 server_ = s;
106 appState_ = APP_INIT;
107 eventFlags_ = 0;
108
109 readBufferPos_ = 0;
110 readWant_ = 0;
111
112 writeBuffer_ = NULL;
113 writeBufferSize_ = 0;
114 writeBufferPos_ = 0;
115
116 socketState_ = SOCKET_RECV;
117 appState_ = APP_INIT;
Mark Slee79b16942007-11-26 19:05:29 +0000118
Mark Slee2f6404d2006-10-10 01:37:40 +0000119 // Set flags, which also registers the event
120 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000121
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000122 // get input/transports
123 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
124 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000125
126 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000127 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
128 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000129
130 // Set up for any server event handler
131 serverEventHandler_ = server_->getEventHandler();
132 if (serverEventHandler_ != NULL) {
133 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
134 } else {
135 connectionContext_ = NULL;
136 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000137}
138
139void TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000140 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000141 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000142
143 switch (socketState_) {
144 case SOCKET_RECV:
145 // It is an error to be in this state if we already have all the data
146 assert(readBufferPos_ < readWant_);
147
Mark Slee2f6404d2006-10-10 01:37:40 +0000148 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000149 if (readWant_ > readBufferSize_) {
David Reiss472fffb2010-03-09 05:20:24 +0000150 uint32_t newSize = readBufferSize_;
151 while (readWant_ > newSize) {
152 newSize *= 2;
Mark Slee2f6404d2006-10-10 01:37:40 +0000153 }
David Reiss472fffb2010-03-09 05:20:24 +0000154 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
155 if (newBuffer == NULL) {
boz6ded7752007-06-05 22:41:18 +0000156 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000157 close();
158 return;
159 }
David Reiss472fffb2010-03-09 05:20:24 +0000160 readBuffer_ = newBuffer;
161 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000162 }
163
David Reiss105961d2010-10-06 17:10:17 +0000164 try {
165 // Read from the socket
166 fetch = readWant_ - readBufferPos_;
167 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
168 }
169 catch (TTransportException& te) {
170 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
171 close();
Mark Slee79b16942007-11-26 19:05:29 +0000172
David Reiss105961d2010-10-06 17:10:17 +0000173 return;
174 }
175
Mark Slee2f6404d2006-10-10 01:37:40 +0000176 if (got > 0) {
177 // Move along in the buffer
178 readBufferPos_ += got;
179
180 // Check that we did not overdo it
181 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000182
Mark Slee2f6404d2006-10-10 01:37:40 +0000183 // We are done reading, move onto the next state
184 if (readBufferPos_ == readWant_) {
185 transition();
186 }
187 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000188 }
189
190 // Whenever we get down here it means a remote disconnect
191 close();
Mark Slee79b16942007-11-26 19:05:29 +0000192
Mark Slee2f6404d2006-10-10 01:37:40 +0000193 return;
194
195 case SOCKET_SEND:
196 // Should never have position past size
197 assert(writeBufferPos_ <= writeBufferSize_);
198
199 // If there is no data to send, then let us move on
200 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000201 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000202 transition();
203 return;
204 }
205
David Reiss105961d2010-10-06 17:10:17 +0000206 try {
207 left = writeBufferSize_ - writeBufferPos_;
208 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
209 }
210 catch (TTransportException& te) {
211 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000212 close();
213 return;
214 }
215
216 writeBufferPos_ += sent;
217
218 // Did we overdo it?
219 assert(writeBufferPos_ <= writeBufferSize_);
220
Mark Slee79b16942007-11-26 19:05:29 +0000221 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000222 if (writeBufferPos_ == writeBufferSize_) {
223 transition();
224 }
225
226 return;
227
228 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000229 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000230 assert(0);
231 }
232}
233
234/**
235 * This is called when the application transitions from one state into
236 * another. This means that it has finished writing the data that it needed
237 * to, or finished receiving the data that it needed to.
238 */
239void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000240
241 int sz = 0;
242
Mark Slee2f6404d2006-10-10 01:37:40 +0000243 // Switch upon the state that we are currently in and move to a new state
244 switch (appState_) {
245
246 case APP_READ_REQUEST:
247 // We are done reading the request, package the read buffer into transport
248 // and get back some data from the dispatch function
249 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000250 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000251 // Prepend four bytes of blank space to the buffer so we can
252 // write the frame size there later.
253 outputTransport_->getWritePtr(4);
254 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000255
David Reiss01fe1532010-03-09 05:19:25 +0000256 server_->incrementActiveProcessors();
257
Mark Sleee02385b2007-06-09 01:21:16 +0000258 if (server_->isThreadPoolProcessing()) {
259 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000260
David Reiss01fe1532010-03-09 05:19:25 +0000261 // Create task and dispatch to the thread manager
262 boost::shared_ptr<Runnable> task =
263 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
264 inputProtocol_,
265 outputProtocol_,
266 this));
267 // The application is now waiting on the task to finish
268 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000269
David Reisse11f3072008-10-07 21:39:19 +0000270 try {
271 server_->addTask(task);
272 } catch (IllegalStateException & ise) {
273 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000274 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000275 close();
276 }
Mark Slee402ee282007-08-23 01:43:20 +0000277
David Reiss01fe1532010-03-09 05:19:25 +0000278 // Set this connection idle so that libevent doesn't process more
279 // data on it while we're still waiting for the threadmanager to
280 // finish this task
281 setIdle();
282 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000283 } else {
284 try {
285 // Invoke the processor
David Reiss23248712010-10-06 17:10:08 +0000286 server_->getProcessor()->process(inputProtocol_, outputProtocol_, NULL);
Mark Sleee02385b2007-06-09 01:21:16 +0000287 } catch (TTransportException &ttx) {
David Reiss01e55c12008-07-13 22:18:51 +0000288 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000289 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000290 close();
291 return;
292 } catch (TException &x) {
David Reiss01e55c12008-07-13 22:18:51 +0000293 GlobalOutput.printf("TException: Server::process() %s", x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000294 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000295 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000296 return;
297 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000298 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000299 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000300 close();
301 return;
302 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000303 }
304
Mark Slee402ee282007-08-23 01:43:20 +0000305 // Intentionally fall through here, the call to process has written into
306 // the writeBuffer_
307
Mark Sleee02385b2007-06-09 01:21:16 +0000308 case APP_WAIT_TASK:
309 // We have now finished processing a task and the result has been written
310 // into the outputTransport_, so we grab its contents and place them into
311 // the writeBuffer_ for actual writing by the libevent thread
312
David Reiss01fe1532010-03-09 05:19:25 +0000313 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000314 // Get the result of the operation
315 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
316
317 // If the function call generated return data, then move into the send
318 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000319 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000320 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000321
322 // Move into write state
323 writeBufferPos_ = 0;
324 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000325
David Reissaf787782008-07-03 20:29:34 +0000326 // Put the frame size into the write buffer
327 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
328 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000329
330 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000331 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000332 setWrite();
333
334 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000335 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000336
337 return;
338 }
339
David Reissc51986f2009-03-24 20:01:25 +0000340 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000341 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000342 goto LABEL_APP_INIT;
343
Mark Slee2f6404d2006-10-10 01:37:40 +0000344 case APP_SEND_RESULT:
345
346 // N.B.: We also intentionally fall through here into the INIT state!
347
Mark Slee92f00fb2006-10-25 01:28:17 +0000348 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000349 case APP_INIT:
350
351 // Clear write buffer variables
352 writeBuffer_ = NULL;
353 writeBufferPos_ = 0;
354 writeBufferSize_ = 0;
355
356 // Set up read buffer for getting 4 bytes
357 readBufferPos_ = 0;
358 readWant_ = 4;
359
360 // Into read4 state we go
361 socketState_ = SOCKET_RECV;
362 appState_ = APP_READ_FRAME_SIZE;
363
364 // Register read event
365 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000366
Mark Slee2f6404d2006-10-10 01:37:40 +0000367 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000368 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000369
370 return;
371
372 case APP_READ_FRAME_SIZE:
373 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000374 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000375 sz = (int32_t)ntohl(sz);
376
377 if (sz <= 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000378 GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
Mark Slee2f6404d2006-10-10 01:37:40 +0000379 close();
380 return;
381 }
382
383 // Reset the read buffer
384 readWant_ = (uint32_t)sz;
385 readBufferPos_= 0;
386
387 // Move into read request state
388 appState_ = APP_READ_REQUEST;
389
390 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000391 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000392
393 return;
394
David Reiss01fe1532010-03-09 05:19:25 +0000395 case APP_CLOSE_CONNECTION:
396 server_->decrementActiveProcessors();
397 close();
398 return;
399
Mark Slee2f6404d2006-10-10 01:37:40 +0000400 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000401 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000402 assert(0);
403 }
404}
405
406void TConnection::setFlags(short eventFlags) {
407 // Catch the do nothing case
408 if (eventFlags_ == eventFlags) {
409 return;
410 }
411
412 // Delete a previously existing event
413 if (eventFlags_ != 0) {
414 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000415 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000416 return;
417 }
418 }
419
420 // Update in memory structure
421 eventFlags_ = eventFlags;
422
Mark Slee402ee282007-08-23 01:43:20 +0000423 // Do not call event_set if there are no flags
424 if (!eventFlags_) {
425 return;
426 }
427
David Reiss01fe1532010-03-09 05:19:25 +0000428 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000429 * event_set:
430 *
431 * Prepares the event structure &event to be used in future calls to
432 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000433 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000434 *
435 * The events can be either EV_READ, EV_WRITE, or both, indicating
436 * that an application can read or write from the file respectively without
437 * blocking.
438 *
Mark Sleee02385b2007-06-09 01:21:16 +0000439 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000440 * the event and the type of event which will be one of: EV_TIMEOUT,
441 * EV_SIGNAL, EV_READ, EV_WRITE.
442 *
443 * The additional flag EV_PERSIST makes an event_add() persistent until
444 * event_del() has been called.
445 *
446 * Once initialized, the &event struct can be used repeatedly with
447 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000448 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000449 * when an ev structure has been added to libevent using event_add() the
450 * structure must persist until the event occurs (assuming EV_PERSIST
451 * is not set) or is removed using event_del(). You may not reuse the same
452 * ev structure for multiple monitored descriptors; each descriptor needs
453 * its own ev.
454 */
David Reiss105961d2010-10-06 17:10:17 +0000455 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
456 TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000457 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000458
459 // Add the event
460 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000461 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000462 }
463}
464
465/**
466 * Closes a connection
467 */
468void TConnection::close() {
469 // Delete the registered libevent
470 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000471 GlobalOutput.perror("TConnection::close() event_del", errno);
472 }
473
474 if (serverEventHandler_ != NULL) {
475 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000476 }
477
478 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000479 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000480
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000481 // close any factory produced transports
482 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000483 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000484
Mark Slee2f6404d2006-10-10 01:37:40 +0000485 // Give this object back to the server that owns it
486 server_->returnConnection(this);
487}
488
David Reiss01fe1532010-03-09 05:19:25 +0000489void TConnection::checkIdleBufferMemLimit(size_t limit) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000490 if (readBufferSize_ > limit) {
491 readBufferSize_ = limit;
492 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
493 if (readBuffer_ == NULL) {
494 GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
495 close();
496 }
497 }
498}
499
David Reiss8ede8182010-09-02 15:26:28 +0000500TNonblockingServer::~TNonblockingServer() {
501 // TODO: We currently leak any active TConnection objects.
502 // Since we're shutting down and destroying the event_base, the TConnection
503 // objects will never receive any additional callbacks. (And even if they
504 // did, it would be bad, since they keep a pointer around to the server,
505 // which is being destroyed.)
506
507 // Clean up unused TConnection objects in connectionStack_
508 while (!connectionStack_.empty()) {
509 TConnection* connection = connectionStack_.top();
510 connectionStack_.pop();
511 delete connection;
512 }
513
514 if (eventBase_) {
515 event_base_free(eventBase_);
516 }
517
518 if (serverSocket_ >= 0) {
519 close(serverSocket_);
520 }
521}
522
Mark Slee2f6404d2006-10-10 01:37:40 +0000523/**
524 * Creates a new connection either by reusing an object off the stack or
525 * by allocating a new one entirely
526 */
David Reiss105961d2010-10-06 17:10:17 +0000527TConnection* TNonblockingServer::createConnection(int socket, short flags,
528 const sockaddr* addr,
529 socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000530 // Check the stack
531 if (connectionStack_.empty()) {
David Reiss105961d2010-10-06 17:10:17 +0000532 return new TConnection(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000533 } else {
534 TConnection* result = connectionStack_.top();
535 connectionStack_.pop();
David Reiss105961d2010-10-06 17:10:17 +0000536 result->init(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000537 return result;
538 }
539}
540
541/**
542 * Returns a connection to the stack
543 */
544void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000545 if (connectionStackLimit_ &&
546 (connectionStack_.size() >= connectionStackLimit_)) {
547 delete connection;
548 } else {
549 connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
550 connectionStack_.push(connection);
551 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000552}
553
554/**
David Reissa79e4882008-03-05 07:51:47 +0000555 * Server socket had something happen. We accept all waiting client
556 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000557 */
558void TNonblockingServer::handleEvent(int fd, short which) {
David Reiss3bb5e052010-01-25 19:31:31 +0000559 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000560 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000561
Mark Slee2f6404d2006-10-10 01:37:40 +0000562 // Server socket accepted a new connection
563 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000564 sockaddr_storage addrStorage;
565 sockaddr* addrp = (sockaddr*)&addrStorage;
566 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000567
Mark Slee2f6404d2006-10-10 01:37:40 +0000568 // Going to accept a new client socket
569 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000570
Mark Slee2f6404d2006-10-10 01:37:40 +0000571 // Accept as many new clients as possible, even though libevent signaled only
572 // one, this helps us to avoid having to go back into the libevent engine so
573 // many times
David Reiss105961d2010-10-06 17:10:17 +0000574 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000575 // If we're overloaded, take action here
576 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
577 nConnectionsDropped_++;
578 nTotalConnectionsDropped_++;
579 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
580 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000581 return;
David Reiss01fe1532010-03-09 05:19:25 +0000582 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
583 if (!drainPendingTask()) {
584 // Nothing left to discard, so we drop connection instead.
585 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000586 return;
David Reiss01fe1532010-03-09 05:19:25 +0000587 }
588 }
589 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000590 // Explicitly set this socket to NONBLOCK mode
591 int flags;
592 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
593 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000594 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000595 close(clientSocket);
596 return;
597 }
598
599 // Create a new TConnection for this client socket.
600 TConnection* clientConnection =
David Reiss105961d2010-10-06 17:10:17 +0000601 createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000602
603 // Fail fast if we could not create a TConnection object
604 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000605 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000606 close(clientSocket);
607 return;
608 }
609
610 // Put this client connection into the proper state
611 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000612
613 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000614 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000615 }
Mark Slee79b16942007-11-26 19:05:29 +0000616
Mark Slee2f6404d2006-10-10 01:37:40 +0000617 // Done looping accept, now we have to make sure the error is due to
618 // blocking. Any other error is a problem
619 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000620 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000621 }
622}
623
624/**
Mark Slee79b16942007-11-26 19:05:29 +0000625 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000626 */
Mark Slee79b16942007-11-26 19:05:29 +0000627void TNonblockingServer::listenSocket() {
628 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000629 struct addrinfo hints, *res, *res0;
630 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000631
Mark Sleefb4b5142007-11-20 01:27:08 +0000632 char port[sizeof("65536") + 1];
633 memset(&hints, 0, sizeof(hints));
634 hints.ai_family = PF_UNSPEC;
635 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000636 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000637 sprintf(port, "%d", port_);
638
639 // Wildcard address
640 error = getaddrinfo(NULL, port, &hints, &res0);
641 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000642 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
643 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000644 return;
645 }
646
647 // Pick the ipv6 address first since ipv4 addresses can be mapped
648 // into ipv6 space.
649 for (res = res0; res; res = res->ai_next) {
650 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
651 break;
652 }
653
Mark Slee2f6404d2006-10-10 01:37:40 +0000654 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000655 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
656 if (s == -1) {
657 freeaddrinfo(res0);
658 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000659 }
660
David Reiss13aea462008-06-10 22:56:04 +0000661 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +0000662 if (res->ai_family == AF_INET6) {
663 int zero = 0;
664 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
665 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
666 }
David Reiss13aea462008-06-10 22:56:04 +0000667 }
668 #endif // #ifdef IPV6_V6ONLY
669
670
Mark Slee79b16942007-11-26 19:05:29 +0000671 int one = 1;
672
673 // Set reuseaddr to avoid 2MSL delay on server restart
674 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
675
676 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
677 close(s);
678 freeaddrinfo(res0);
679 throw TException("TNonblockingServer::serve() bind");
680 }
681
682 // Done with the addr info
683 freeaddrinfo(res0);
684
685 // Set up this file descriptor for listening
686 listenSocket(s);
687}
688
689/**
690 * Takes a socket created by listenSocket() and sets various options on it
691 * to prepare for use in the server.
692 */
693void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000694 // Set socket to nonblocking mode
695 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000696 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
697 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
698 close(s);
699 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000700 }
701
702 int one = 1;
703 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000704
705 // Keepalive to ensure full result flushing
Mark Slee79b16942007-11-26 19:05:29 +0000706 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000707
708 // Turn linger off to avoid hung sockets
Mark Slee79b16942007-11-26 19:05:29 +0000709 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000710
711 // Set TCP nodelay if available, MAC OS X Hack
712 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
713 #ifndef TCP_NOPUSH
Mark Slee79b16942007-11-26 19:05:29 +0000714 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000715 #endif
716
David Reiss1c20c872010-03-09 05:20:14 +0000717 #ifdef TCP_LOW_MIN_RTO
718 if (TSocket::getUseLowMinRto()) {
719 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
720 }
721 #endif
722
Mark Slee79b16942007-11-26 19:05:29 +0000723 if (listen(s, LISTEN_BACKLOG) == -1) {
724 close(s);
725 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000726 }
727
Mark Slee79b16942007-11-26 19:05:29 +0000728 // Cool, this socket is good to go, set it as the serverSocket_
729 serverSocket_ = s;
730}
731
David Reiss01fe1532010-03-09 05:19:25 +0000732void TNonblockingServer::createNotificationPipe() {
733 if (pipe(notificationPipeFDs_) != 0) {
734 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
735 throw TException("can't create notification pipe");
736 }
David Reiss83b8fda2010-03-09 05:19:34 +0000737 int flags;
738 if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
739 fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
740 close(notificationPipeFDs_[0]);
741 close(notificationPipeFDs_[1]);
742 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
743 }
David Reiss01fe1532010-03-09 05:19:25 +0000744}
745
Mark Slee79b16942007-11-26 19:05:29 +0000746/**
747 * Register the core libevent events onto the proper base.
748 */
749void TNonblockingServer::registerEvents(event_base* base) {
750 assert(serverSocket_ != -1);
751 assert(!eventBase_);
752 eventBase_ = base;
753
754 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000755 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000756 event_get_version(),
757 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000758
759 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000760 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000761 serverSocket_,
762 EV_READ | EV_PERSIST,
763 TNonblockingServer::eventHandler,
764 this);
Mark Slee79b16942007-11-26 19:05:29 +0000765 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000766
767 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000768 if (-1 == event_add(&serverEvent_, 0)) {
769 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000770 }
David Reiss01fe1532010-03-09 05:19:25 +0000771 if (threadPoolProcessing_) {
772 // Create an event to be notified when a task finishes
773 event_set(&notificationEvent_,
774 getNotificationRecvFD(),
775 EV_READ | EV_PERSIST,
776 TConnection::taskHandler,
777 this);
David Reiss1c20c872010-03-09 05:20:14 +0000778
David Reiss01fe1532010-03-09 05:19:25 +0000779 // Attach to the base
780 event_base_set(eventBase_, &notificationEvent_);
781
782 // Add the event and start up the server
783 if (-1 == event_add(&notificationEvent_, 0)) {
784 throw TException("TNonblockingServer::serve(): notification event_add fail");
785 }
786 }
787}
788
David Reiss068f4162010-03-09 05:19:45 +0000789void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
790 threadManager_ = threadManager;
791 if (threadManager != NULL) {
792 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
793 threadPoolProcessing_ = true;
794 } else {
795 threadPoolProcessing_ = false;
796 }
797}
798
David Reiss01fe1532010-03-09 05:19:25 +0000799bool TNonblockingServer::serverOverloaded() {
800 size_t activeConnections = numTConnections_ - connectionStack_.size();
801 if (numActiveProcessors_ > maxActiveProcessors_ ||
802 activeConnections > maxConnections_) {
803 if (!overloaded_) {
804 GlobalOutput.printf("thrift non-blocking server overload condition");
805 overloaded_ = true;
806 }
807 } else {
808 if (overloaded_ &&
809 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
810 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
811 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
812 nConnectionsDropped_, nTotalConnectionsDropped_);
813 nConnectionsDropped_ = 0;
814 overloaded_ = false;
815 }
816 }
817
818 return overloaded_;
819}
820
821bool TNonblockingServer::drainPendingTask() {
822 if (threadManager_) {
823 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
824 if (task) {
825 TConnection* connection =
826 static_cast<TConnection::Task*>(task.get())->getTConnection();
827 assert(connection && connection->getServer()
828 && connection->getState() == APP_WAIT_TASK);
829 connection->forceClose();
830 return true;
831 }
832 }
833 return false;
Mark Slee79b16942007-11-26 19:05:29 +0000834}
835
David Reiss068f4162010-03-09 05:19:45 +0000836void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
837 TConnection* connection =
838 static_cast<TConnection::Task*>(task.get())->getTConnection();
839 assert(connection && connection->getServer()
840 && connection->getState() == APP_WAIT_TASK);
841 connection->forceClose();
842}
843
Mark Slee79b16942007-11-26 19:05:29 +0000844/**
845 * Main workhorse function, starts up the server listening on a port and
846 * loops over the libevent handler.
847 */
848void TNonblockingServer::serve() {
849 // Init socket
850 listenSocket();
851
David Reiss01fe1532010-03-09 05:19:25 +0000852 if (threadPoolProcessing_) {
853 // Init task completion notification pipe
854 createNotificationPipe();
855 }
856
Mark Slee79b16942007-11-26 19:05:29 +0000857 // Initialize libevent core
858 registerEvents(static_cast<event_base*>(event_init()));
Mark Slee2f6404d2006-10-10 01:37:40 +0000859
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000860 // Run the preServe event
861 if (eventHandler_ != NULL) {
862 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000863 }
864
Mark Sleee02385b2007-06-09 01:21:16 +0000865 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000866 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000867}
868
T Jake Lucianib5e62212009-01-31 22:36:20 +0000869}}} // apache::thrift::server