blob: 62970308fc6c7776cf5cfbb333c154bd0e4b896d [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000022#include <transport/TSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000023
Mark Sleee02385b2007-06-09 01:21:16 +000024#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000025#include <sys/socket.h>
26#include <netinet/in.h>
27#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000028#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <fcntl.h>
30#include <errno.h>
31#include <assert.h>
32
David Reiss9b903442009-10-21 05:51:28 +000033#ifndef AF_LOCAL
34#define AF_LOCAL AF_UNIX
35#endif
36
T Jake Lucianib5e62212009-01-31 22:36:20 +000037namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000038
T Jake Lucianib5e62212009-01-31 22:36:20 +000039using namespace apache::thrift::protocol;
40using namespace apache::thrift::transport;
41using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000042using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000043using apache::thrift::transport::TSocket;
44using apache::thrift::transport::TTransportException;
Mark Sleee02385b2007-06-09 01:21:16 +000045
46class TConnection::Task: public Runnable {
47 public:
48 Task(boost::shared_ptr<TProcessor> processor,
49 boost::shared_ptr<TProtocol> input,
50 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +000051 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +000052 processor_(processor),
53 input_(input),
54 output_(output),
David Reiss105961d2010-10-06 17:10:17 +000055 connection_(connection),
56 serverEventHandler_(connection_->getServerEventHandler()),
57 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +000058
59 void run() {
60 try {
David Reiss105961d2010-10-06 17:10:17 +000061 for (;;) {
62 if (serverEventHandler_ != NULL) {
63 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
64 }
65 if (!processor_->process(input_, output_, connectionContext_) ||
66 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +000067 break;
68 }
69 }
70 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000071 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000072 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000073 cerr << "TNonblockingServer exception: " << x.what() << endl;
David Reiss28e88ec2010-03-09 05:19:27 +000074 } catch (bad_alloc&) {
75 cerr << "TNonblockingServer caught bad_alloc exception.";
76 exit(-1);
Mark Sleee02385b2007-06-09 01:21:16 +000077 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000078 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000079 }
Mark Slee79b16942007-11-26 19:05:29 +000080
David Reiss01fe1532010-03-09 05:19:25 +000081 // Signal completion back to the libevent thread via a pipe
82 if (!connection_->notifyServer()) {
83 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +000084 }
David Reiss01fe1532010-03-09 05:19:25 +000085 }
86
87 TConnection* getTConnection() {
88 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +000089 }
90
91 private:
92 boost::shared_ptr<TProcessor> processor_;
93 boost::shared_ptr<TProtocol> input_;
94 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +000095 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +000096 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
97 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +000098};
Mark Slee5ea15f92007-03-05 22:55:59 +000099
David Reiss105961d2010-10-06 17:10:17 +0000100void TConnection::init(int socket, short eventFlags, TNonblockingServer* s,
101 const sockaddr* addr, socklen_t addrLen) {
102 tSocket_->setSocketFD(socket);
103 tSocket_->setCachedAddress(addr, addrLen);
104
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 server_ = s;
106 appState_ = APP_INIT;
107 eventFlags_ = 0;
108
109 readBufferPos_ = 0;
110 readWant_ = 0;
111
112 writeBuffer_ = NULL;
113 writeBufferSize_ = 0;
114 writeBufferPos_ = 0;
115
116 socketState_ = SOCKET_RECV;
117 appState_ = APP_INIT;
Mark Slee79b16942007-11-26 19:05:29 +0000118
Mark Slee2f6404d2006-10-10 01:37:40 +0000119 // Set flags, which also registers the event
120 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000121
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000122 // get input/transports
123 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
124 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000125
126 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000127 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
128 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000129
130 // Set up for any server event handler
131 serverEventHandler_ = server_->getEventHandler();
132 if (serverEventHandler_ != NULL) {
133 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
134 } else {
135 connectionContext_ = NULL;
136 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000137}
138
139void TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000140 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000141 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000142
143 switch (socketState_) {
144 case SOCKET_RECV:
145 // It is an error to be in this state if we already have all the data
146 assert(readBufferPos_ < readWant_);
147
Mark Slee2f6404d2006-10-10 01:37:40 +0000148 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000149 if (readWant_ > readBufferSize_) {
David Reiss472fffb2010-03-09 05:20:24 +0000150 uint32_t newSize = readBufferSize_;
151 while (readWant_ > newSize) {
152 newSize *= 2;
Mark Slee2f6404d2006-10-10 01:37:40 +0000153 }
David Reiss472fffb2010-03-09 05:20:24 +0000154 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
155 if (newBuffer == NULL) {
boz6ded7752007-06-05 22:41:18 +0000156 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000157 close();
158 return;
159 }
David Reiss472fffb2010-03-09 05:20:24 +0000160 readBuffer_ = newBuffer;
161 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000162 }
163
David Reiss105961d2010-10-06 17:10:17 +0000164 try {
165 // Read from the socket
166 fetch = readWant_ - readBufferPos_;
167 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
168 }
169 catch (TTransportException& te) {
170 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
171 close();
Mark Slee79b16942007-11-26 19:05:29 +0000172
David Reiss105961d2010-10-06 17:10:17 +0000173 return;
174 }
175
Mark Slee2f6404d2006-10-10 01:37:40 +0000176 if (got > 0) {
177 // Move along in the buffer
178 readBufferPos_ += got;
179
180 // Check that we did not overdo it
181 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000182
Mark Slee2f6404d2006-10-10 01:37:40 +0000183 // We are done reading, move onto the next state
184 if (readBufferPos_ == readWant_) {
185 transition();
186 }
187 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000188 }
189
190 // Whenever we get down here it means a remote disconnect
191 close();
Mark Slee79b16942007-11-26 19:05:29 +0000192
Mark Slee2f6404d2006-10-10 01:37:40 +0000193 return;
194
195 case SOCKET_SEND:
196 // Should never have position past size
197 assert(writeBufferPos_ <= writeBufferSize_);
198
199 // If there is no data to send, then let us move on
200 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000201 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000202 transition();
203 return;
204 }
205
David Reiss105961d2010-10-06 17:10:17 +0000206 try {
207 left = writeBufferSize_ - writeBufferPos_;
208 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
209 }
210 catch (TTransportException& te) {
211 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000212 close();
213 return;
214 }
215
216 writeBufferPos_ += sent;
217
218 // Did we overdo it?
219 assert(writeBufferPos_ <= writeBufferSize_);
220
Mark Slee79b16942007-11-26 19:05:29 +0000221 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000222 if (writeBufferPos_ == writeBufferSize_) {
223 transition();
224 }
225
226 return;
227
228 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000229 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000230 assert(0);
231 }
232}
233
234/**
235 * This is called when the application transitions from one state into
236 * another. This means that it has finished writing the data that it needed
237 * to, or finished receiving the data that it needed to.
238 */
239void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000240
241 int sz = 0;
242
Mark Slee2f6404d2006-10-10 01:37:40 +0000243 // Switch upon the state that we are currently in and move to a new state
244 switch (appState_) {
245
246 case APP_READ_REQUEST:
247 // We are done reading the request, package the read buffer into transport
248 // and get back some data from the dispatch function
Kevin Clark5ace1782009-03-04 21:10:58 +0000249 // If we've used these transport buffers enough times, reset them to avoid bloating
250
Mark Slee2f6404d2006-10-10 01:37:40 +0000251 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
Kevin Clark5ace1782009-03-04 21:10:58 +0000252 ++numReadsSinceReset_;
253 if (numWritesSinceReset_ < 512) {
254 outputTransport_->resetBuffer();
255 } else {
256 // reset the capacity of the output transport if we used it enough times that it might be bloated
257 try {
258 outputTransport_->resetBuffer(true);
259 numWritesSinceReset_ = 0;
260 } catch (TTransportException &ttx) {
261 GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
262 close();
263 return;
264 }
265 }
266
David Reiss52cb7a72008-06-30 21:40:35 +0000267 // Prepend four bytes of blank space to the buffer so we can
268 // write the frame size there later.
269 outputTransport_->getWritePtr(4);
270 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000271
David Reiss01fe1532010-03-09 05:19:25 +0000272 server_->incrementActiveProcessors();
273
Mark Sleee02385b2007-06-09 01:21:16 +0000274 if (server_->isThreadPoolProcessing()) {
275 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000276
David Reiss01fe1532010-03-09 05:19:25 +0000277 // Create task and dispatch to the thread manager
278 boost::shared_ptr<Runnable> task =
279 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
280 inputProtocol_,
281 outputProtocol_,
282 this));
283 // The application is now waiting on the task to finish
284 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000285
David Reisse11f3072008-10-07 21:39:19 +0000286 try {
287 server_->addTask(task);
288 } catch (IllegalStateException & ise) {
289 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000290 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000291 close();
292 }
Mark Slee402ee282007-08-23 01:43:20 +0000293
David Reiss01fe1532010-03-09 05:19:25 +0000294 // Set this connection idle so that libevent doesn't process more
295 // data on it while we're still waiting for the threadmanager to
296 // finish this task
297 setIdle();
298 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000299 } else {
300 try {
301 // Invoke the processor
David Reiss23248712010-10-06 17:10:08 +0000302 server_->getProcessor()->process(inputProtocol_, outputProtocol_, NULL);
Mark Sleee02385b2007-06-09 01:21:16 +0000303 } catch (TTransportException &ttx) {
David Reiss01e55c12008-07-13 22:18:51 +0000304 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000305 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000306 close();
307 return;
308 } catch (TException &x) {
David Reiss01e55c12008-07-13 22:18:51 +0000309 GlobalOutput.printf("TException: Server::process() %s", x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000310 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000311 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000312 return;
313 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000314 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000315 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000316 close();
317 return;
318 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000319 }
320
Mark Slee402ee282007-08-23 01:43:20 +0000321 // Intentionally fall through here, the call to process has written into
322 // the writeBuffer_
323
Mark Sleee02385b2007-06-09 01:21:16 +0000324 case APP_WAIT_TASK:
325 // We have now finished processing a task and the result has been written
326 // into the outputTransport_, so we grab its contents and place them into
327 // the writeBuffer_ for actual writing by the libevent thread
328
David Reiss01fe1532010-03-09 05:19:25 +0000329 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000330 // Get the result of the operation
331 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
332
333 // If the function call generated return data, then move into the send
334 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000335 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000336 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000337
338 // Move into write state
339 writeBufferPos_ = 0;
340 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000341
David Reissaf787782008-07-03 20:29:34 +0000342 // Put the frame size into the write buffer
343 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
344 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000345
346 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000347 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000348 setWrite();
349
350 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000351 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000352
353 return;
354 }
355
David Reissc51986f2009-03-24 20:01:25 +0000356 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000357 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000358 goto LABEL_APP_INIT;
359
Mark Slee2f6404d2006-10-10 01:37:40 +0000360 case APP_SEND_RESULT:
361
Kevin Clark5ace1782009-03-04 21:10:58 +0000362 ++numWritesSinceReset_;
363
Mark Slee2f6404d2006-10-10 01:37:40 +0000364 // N.B.: We also intentionally fall through here into the INIT state!
365
Mark Slee92f00fb2006-10-25 01:28:17 +0000366 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000367 case APP_INIT:
368
Kevin Clark5ace1782009-03-04 21:10:58 +0000369 // reset the input buffer if we used it enough times that it might be bloated
370 if (numReadsSinceReset_ > 512)
371 {
372 void * new_buffer = std::realloc(readBuffer_, 1024);
373 if (new_buffer == NULL) {
374 GlobalOutput("TConnection::transition() realloc");
375 close();
376 return;
377 }
378 readBuffer_ = (uint8_t*) new_buffer;
379 readBufferSize_ = 1024;
380 numReadsSinceReset_ = 0;
381 }
382
Mark Slee2f6404d2006-10-10 01:37:40 +0000383 // Clear write buffer variables
384 writeBuffer_ = NULL;
385 writeBufferPos_ = 0;
386 writeBufferSize_ = 0;
387
388 // Set up read buffer for getting 4 bytes
389 readBufferPos_ = 0;
390 readWant_ = 4;
391
392 // Into read4 state we go
393 socketState_ = SOCKET_RECV;
394 appState_ = APP_READ_FRAME_SIZE;
395
396 // Register read event
397 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000398
Mark Slee2f6404d2006-10-10 01:37:40 +0000399 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000400 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000401
402 return;
403
404 case APP_READ_FRAME_SIZE:
405 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000406 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000407 sz = (int32_t)ntohl(sz);
408
409 if (sz <= 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000410 GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
Mark Slee2f6404d2006-10-10 01:37:40 +0000411 close();
412 return;
413 }
414
415 // Reset the read buffer
416 readWant_ = (uint32_t)sz;
417 readBufferPos_= 0;
418
419 // Move into read request state
420 appState_ = APP_READ_REQUEST;
421
422 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000423 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000424
425 return;
426
David Reiss01fe1532010-03-09 05:19:25 +0000427 case APP_CLOSE_CONNECTION:
428 server_->decrementActiveProcessors();
429 close();
430 return;
431
Mark Slee2f6404d2006-10-10 01:37:40 +0000432 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000433 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000434 assert(0);
435 }
436}
437
438void TConnection::setFlags(short eventFlags) {
439 // Catch the do nothing case
440 if (eventFlags_ == eventFlags) {
441 return;
442 }
443
444 // Delete a previously existing event
445 if (eventFlags_ != 0) {
446 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000447 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000448 return;
449 }
450 }
451
452 // Update in memory structure
453 eventFlags_ = eventFlags;
454
Mark Slee402ee282007-08-23 01:43:20 +0000455 // Do not call event_set if there are no flags
456 if (!eventFlags_) {
457 return;
458 }
459
David Reiss01fe1532010-03-09 05:19:25 +0000460 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000461 * event_set:
462 *
463 * Prepares the event structure &event to be used in future calls to
464 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000465 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000466 *
467 * The events can be either EV_READ, EV_WRITE, or both, indicating
468 * that an application can read or write from the file respectively without
469 * blocking.
470 *
Mark Sleee02385b2007-06-09 01:21:16 +0000471 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000472 * the event and the type of event which will be one of: EV_TIMEOUT,
473 * EV_SIGNAL, EV_READ, EV_WRITE.
474 *
475 * The additional flag EV_PERSIST makes an event_add() persistent until
476 * event_del() has been called.
477 *
478 * Once initialized, the &event struct can be used repeatedly with
479 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000480 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000481 * when an ev structure has been added to libevent using event_add() the
482 * structure must persist until the event occurs (assuming EV_PERSIST
483 * is not set) or is removed using event_del(). You may not reuse the same
484 * ev structure for multiple monitored descriptors; each descriptor needs
485 * its own ev.
486 */
David Reiss105961d2010-10-06 17:10:17 +0000487 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
488 TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000489 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000490
491 // Add the event
492 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000493 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000494 }
495}
496
497/**
498 * Closes a connection
499 */
500void TConnection::close() {
501 // Delete the registered libevent
502 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000503 GlobalOutput.perror("TConnection::close() event_del", errno);
504 }
505
506 if (serverEventHandler_ != NULL) {
507 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000508 }
509
510 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000511 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000512
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000513 // close any factory produced transports
514 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000515 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000516
Mark Slee2f6404d2006-10-10 01:37:40 +0000517 // Give this object back to the server that owns it
518 server_->returnConnection(this);
519}
520
David Reiss01fe1532010-03-09 05:19:25 +0000521void TConnection::checkIdleBufferMemLimit(size_t limit) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000522 if (readBufferSize_ > limit) {
523 readBufferSize_ = limit;
524 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
525 if (readBuffer_ == NULL) {
526 GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
527 close();
528 }
529 }
530}
531
David Reiss8ede8182010-09-02 15:26:28 +0000532TNonblockingServer::~TNonblockingServer() {
533 // TODO: We currently leak any active TConnection objects.
534 // Since we're shutting down and destroying the event_base, the TConnection
535 // objects will never receive any additional callbacks. (And even if they
536 // did, it would be bad, since they keep a pointer around to the server,
537 // which is being destroyed.)
538
539 // Clean up unused TConnection objects in connectionStack_
540 while (!connectionStack_.empty()) {
541 TConnection* connection = connectionStack_.top();
542 connectionStack_.pop();
543 delete connection;
544 }
545
546 if (eventBase_) {
547 event_base_free(eventBase_);
548 }
549
550 if (serverSocket_ >= 0) {
551 close(serverSocket_);
552 }
553}
554
Mark Slee2f6404d2006-10-10 01:37:40 +0000555/**
556 * Creates a new connection either by reusing an object off the stack or
557 * by allocating a new one entirely
558 */
David Reiss105961d2010-10-06 17:10:17 +0000559TConnection* TNonblockingServer::createConnection(int socket, short flags,
560 const sockaddr* addr,
561 socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000562 // Check the stack
563 if (connectionStack_.empty()) {
David Reiss105961d2010-10-06 17:10:17 +0000564 return new TConnection(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000565 } else {
566 TConnection* result = connectionStack_.top();
567 connectionStack_.pop();
David Reiss105961d2010-10-06 17:10:17 +0000568 result->init(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000569 return result;
570 }
571}
572
573/**
574 * Returns a connection to the stack
575 */
576void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000577 if (connectionStackLimit_ &&
578 (connectionStack_.size() >= connectionStackLimit_)) {
579 delete connection;
580 } else {
581 connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
582 connectionStack_.push(connection);
583 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000584}
585
586/**
David Reissa79e4882008-03-05 07:51:47 +0000587 * Server socket had something happen. We accept all waiting client
588 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000589 */
590void TNonblockingServer::handleEvent(int fd, short which) {
David Reiss3bb5e052010-01-25 19:31:31 +0000591 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000592 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000593
Mark Slee2f6404d2006-10-10 01:37:40 +0000594 // Server socket accepted a new connection
595 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000596 sockaddr_storage addrStorage;
597 sockaddr* addrp = (sockaddr*)&addrStorage;
598 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000599
Mark Slee2f6404d2006-10-10 01:37:40 +0000600 // Going to accept a new client socket
601 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000602
Mark Slee2f6404d2006-10-10 01:37:40 +0000603 // Accept as many new clients as possible, even though libevent signaled only
604 // one, this helps us to avoid having to go back into the libevent engine so
605 // many times
David Reiss105961d2010-10-06 17:10:17 +0000606 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000607 // If we're overloaded, take action here
608 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
609 nConnectionsDropped_++;
610 nTotalConnectionsDropped_++;
611 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
612 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000613 return;
David Reiss01fe1532010-03-09 05:19:25 +0000614 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
615 if (!drainPendingTask()) {
616 // Nothing left to discard, so we drop connection instead.
617 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000618 return;
David Reiss01fe1532010-03-09 05:19:25 +0000619 }
620 }
621 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000622 // Explicitly set this socket to NONBLOCK mode
623 int flags;
624 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
625 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000626 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000627 close(clientSocket);
628 return;
629 }
630
631 // Create a new TConnection for this client socket.
632 TConnection* clientConnection =
David Reiss105961d2010-10-06 17:10:17 +0000633 createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000634
635 // Fail fast if we could not create a TConnection object
636 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000637 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000638 close(clientSocket);
639 return;
640 }
641
642 // Put this client connection into the proper state
643 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000644
645 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000646 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000647 }
Mark Slee79b16942007-11-26 19:05:29 +0000648
Mark Slee2f6404d2006-10-10 01:37:40 +0000649 // Done looping accept, now we have to make sure the error is due to
650 // blocking. Any other error is a problem
651 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000652 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000653 }
654}
655
656/**
Mark Slee79b16942007-11-26 19:05:29 +0000657 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000658 */
Mark Slee79b16942007-11-26 19:05:29 +0000659void TNonblockingServer::listenSocket() {
660 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000661 struct addrinfo hints, *res, *res0;
662 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000663
Mark Sleefb4b5142007-11-20 01:27:08 +0000664 char port[sizeof("65536") + 1];
665 memset(&hints, 0, sizeof(hints));
666 hints.ai_family = PF_UNSPEC;
667 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000668 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000669 sprintf(port, "%d", port_);
670
671 // Wildcard address
672 error = getaddrinfo(NULL, port, &hints, &res0);
673 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000674 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
675 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000676 return;
677 }
678
679 // Pick the ipv6 address first since ipv4 addresses can be mapped
680 // into ipv6 space.
681 for (res = res0; res; res = res->ai_next) {
682 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
683 break;
684 }
685
Mark Slee2f6404d2006-10-10 01:37:40 +0000686 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000687 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
688 if (s == -1) {
689 freeaddrinfo(res0);
690 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000691 }
692
David Reiss13aea462008-06-10 22:56:04 +0000693 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +0000694 if (res->ai_family == AF_INET6) {
695 int zero = 0;
696 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
697 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
698 }
David Reiss13aea462008-06-10 22:56:04 +0000699 }
700 #endif // #ifdef IPV6_V6ONLY
701
702
Mark Slee79b16942007-11-26 19:05:29 +0000703 int one = 1;
704
705 // Set reuseaddr to avoid 2MSL delay on server restart
706 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
707
708 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
709 close(s);
710 freeaddrinfo(res0);
711 throw TException("TNonblockingServer::serve() bind");
712 }
713
714 // Done with the addr info
715 freeaddrinfo(res0);
716
717 // Set up this file descriptor for listening
718 listenSocket(s);
719}
720
721/**
722 * Takes a socket created by listenSocket() and sets various options on it
723 * to prepare for use in the server.
724 */
725void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000726 // Set socket to nonblocking mode
727 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000728 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
729 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
730 close(s);
731 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000732 }
733
734 int one = 1;
735 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000736
737 // Keepalive to ensure full result flushing
Mark Slee79b16942007-11-26 19:05:29 +0000738 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000739
740 // Turn linger off to avoid hung sockets
Mark Slee79b16942007-11-26 19:05:29 +0000741 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000742
743 // Set TCP nodelay if available, MAC OS X Hack
744 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
745 #ifndef TCP_NOPUSH
Mark Slee79b16942007-11-26 19:05:29 +0000746 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000747 #endif
748
David Reiss1c20c872010-03-09 05:20:14 +0000749 #ifdef TCP_LOW_MIN_RTO
750 if (TSocket::getUseLowMinRto()) {
751 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
752 }
753 #endif
754
Mark Slee79b16942007-11-26 19:05:29 +0000755 if (listen(s, LISTEN_BACKLOG) == -1) {
756 close(s);
757 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000758 }
759
Mark Slee79b16942007-11-26 19:05:29 +0000760 // Cool, this socket is good to go, set it as the serverSocket_
761 serverSocket_ = s;
762}
763
David Reiss01fe1532010-03-09 05:19:25 +0000764void TNonblockingServer::createNotificationPipe() {
765 if (pipe(notificationPipeFDs_) != 0) {
766 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
767 throw TException("can't create notification pipe");
768 }
David Reiss83b8fda2010-03-09 05:19:34 +0000769 int flags;
770 if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
771 fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
772 close(notificationPipeFDs_[0]);
773 close(notificationPipeFDs_[1]);
774 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
775 }
David Reiss01fe1532010-03-09 05:19:25 +0000776}
777
Mark Slee79b16942007-11-26 19:05:29 +0000778/**
779 * Register the core libevent events onto the proper base.
780 */
781void TNonblockingServer::registerEvents(event_base* base) {
782 assert(serverSocket_ != -1);
783 assert(!eventBase_);
784 eventBase_ = base;
785
786 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000787 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000788 event_get_version(),
789 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000790
791 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000792 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000793 serverSocket_,
794 EV_READ | EV_PERSIST,
795 TNonblockingServer::eventHandler,
796 this);
Mark Slee79b16942007-11-26 19:05:29 +0000797 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000798
799 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000800 if (-1 == event_add(&serverEvent_, 0)) {
801 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000802 }
David Reiss01fe1532010-03-09 05:19:25 +0000803 if (threadPoolProcessing_) {
804 // Create an event to be notified when a task finishes
805 event_set(&notificationEvent_,
806 getNotificationRecvFD(),
807 EV_READ | EV_PERSIST,
808 TConnection::taskHandler,
809 this);
David Reiss1c20c872010-03-09 05:20:14 +0000810
David Reiss01fe1532010-03-09 05:19:25 +0000811 // Attach to the base
812 event_base_set(eventBase_, &notificationEvent_);
813
814 // Add the event and start up the server
815 if (-1 == event_add(&notificationEvent_, 0)) {
816 throw TException("TNonblockingServer::serve(): notification event_add fail");
817 }
818 }
819}
820
David Reiss068f4162010-03-09 05:19:45 +0000821void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
822 threadManager_ = threadManager;
823 if (threadManager != NULL) {
824 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
825 threadPoolProcessing_ = true;
826 } else {
827 threadPoolProcessing_ = false;
828 }
829}
830
David Reiss01fe1532010-03-09 05:19:25 +0000831bool TNonblockingServer::serverOverloaded() {
832 size_t activeConnections = numTConnections_ - connectionStack_.size();
833 if (numActiveProcessors_ > maxActiveProcessors_ ||
834 activeConnections > maxConnections_) {
835 if (!overloaded_) {
836 GlobalOutput.printf("thrift non-blocking server overload condition");
837 overloaded_ = true;
838 }
839 } else {
840 if (overloaded_ &&
841 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
842 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
843 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
844 nConnectionsDropped_, nTotalConnectionsDropped_);
845 nConnectionsDropped_ = 0;
846 overloaded_ = false;
847 }
848 }
849
850 return overloaded_;
851}
852
853bool TNonblockingServer::drainPendingTask() {
854 if (threadManager_) {
855 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
856 if (task) {
857 TConnection* connection =
858 static_cast<TConnection::Task*>(task.get())->getTConnection();
859 assert(connection && connection->getServer()
860 && connection->getState() == APP_WAIT_TASK);
861 connection->forceClose();
862 return true;
863 }
864 }
865 return false;
Mark Slee79b16942007-11-26 19:05:29 +0000866}
867
David Reiss068f4162010-03-09 05:19:45 +0000868void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
869 TConnection* connection =
870 static_cast<TConnection::Task*>(task.get())->getTConnection();
871 assert(connection && connection->getServer()
872 && connection->getState() == APP_WAIT_TASK);
873 connection->forceClose();
874}
875
Mark Slee79b16942007-11-26 19:05:29 +0000876/**
877 * Main workhorse function, starts up the server listening on a port and
878 * loops over the libevent handler.
879 */
880void TNonblockingServer::serve() {
881 // Init socket
882 listenSocket();
883
David Reiss01fe1532010-03-09 05:19:25 +0000884 if (threadPoolProcessing_) {
885 // Init task completion notification pipe
886 createNotificationPipe();
887 }
888
Mark Slee79b16942007-11-26 19:05:29 +0000889 // Initialize libevent core
890 registerEvents(static_cast<event_base*>(event_init()));
Mark Slee2f6404d2006-10-10 01:37:40 +0000891
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000892 // Run the preServe event
893 if (eventHandler_ != NULL) {
894 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000895 }
896
Mark Sleee02385b2007-06-09 01:21:16 +0000897 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000898 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000899}
900
T Jake Lucianib5e62212009-01-31 22:36:20 +0000901}}} // apache::thrift::server