blob: 41056ab33d7e81a1e42ad2f74c70b08556173fd5 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000022#include <transport/TSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000023
Mark Sleee02385b2007-06-09 01:21:16 +000024#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000025#include <sys/socket.h>
26#include <netinet/in.h>
27#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000028#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <fcntl.h>
30#include <errno.h>
31#include <assert.h>
32
David Reiss9b903442009-10-21 05:51:28 +000033#ifndef AF_LOCAL
34#define AF_LOCAL AF_UNIX
35#endif
36
T Jake Lucianib5e62212009-01-31 22:36:20 +000037namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000038
T Jake Lucianib5e62212009-01-31 22:36:20 +000039using namespace apache::thrift::protocol;
40using namespace apache::thrift::transport;
41using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000042using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000043using apache::thrift::transport::TSocket;
44using apache::thrift::transport::TTransportException;
Mark Sleee02385b2007-06-09 01:21:16 +000045
46class TConnection::Task: public Runnable {
47 public:
48 Task(boost::shared_ptr<TProcessor> processor,
49 boost::shared_ptr<TProtocol> input,
50 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +000051 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +000052 processor_(processor),
53 input_(input),
54 output_(output),
David Reiss105961d2010-10-06 17:10:17 +000055 connection_(connection),
56 serverEventHandler_(connection_->getServerEventHandler()),
57 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +000058
59 void run() {
60 try {
David Reiss105961d2010-10-06 17:10:17 +000061 for (;;) {
62 if (serverEventHandler_ != NULL) {
63 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
64 }
65 if (!processor_->process(input_, output_, connectionContext_) ||
66 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +000067 break;
68 }
69 }
70 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000071 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000072 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000073 cerr << "TNonblockingServer exception: " << x.what() << endl;
David Reiss28e88ec2010-03-09 05:19:27 +000074 } catch (bad_alloc&) {
75 cerr << "TNonblockingServer caught bad_alloc exception.";
76 exit(-1);
Mark Sleee02385b2007-06-09 01:21:16 +000077 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000078 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000079 }
Mark Slee79b16942007-11-26 19:05:29 +000080
David Reiss01fe1532010-03-09 05:19:25 +000081 // Signal completion back to the libevent thread via a pipe
82 if (!connection_->notifyServer()) {
83 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +000084 }
David Reiss01fe1532010-03-09 05:19:25 +000085 }
86
87 TConnection* getTConnection() {
88 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +000089 }
90
91 private:
92 boost::shared_ptr<TProcessor> processor_;
93 boost::shared_ptr<TProtocol> input_;
94 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +000095 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +000096 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
97 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +000098};
Mark Slee5ea15f92007-03-05 22:55:59 +000099
David Reiss105961d2010-10-06 17:10:17 +0000100void TConnection::init(int socket, short eventFlags, TNonblockingServer* s,
101 const sockaddr* addr, socklen_t addrLen) {
102 tSocket_->setSocketFD(socket);
103 tSocket_->setCachedAddress(addr, addrLen);
104
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 server_ = s;
106 appState_ = APP_INIT;
107 eventFlags_ = 0;
108
109 readBufferPos_ = 0;
110 readWant_ = 0;
111
112 writeBuffer_ = NULL;
113 writeBufferSize_ = 0;
114 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000115 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000116
David Reiss89a12942010-10-06 17:10:52 +0000117 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000118 appState_ = APP_INIT;
David Reiss54bec5d2010-10-06 17:10:45 +0000119 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000120
Mark Slee2f6404d2006-10-10 01:37:40 +0000121 // Set flags, which also registers the event
122 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000123
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000124 // get input/transports
125 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
126 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000127
128 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000129 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
130 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000131
132 // Set up for any server event handler
133 serverEventHandler_ = server_->getEventHandler();
134 if (serverEventHandler_ != NULL) {
135 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
136 } else {
137 connectionContext_ = NULL;
138 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000139}
140
141void TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000142 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000143 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000144
145 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000146 case SOCKET_RECV_FRAMING:
147 union {
148 uint8_t buf[sizeof(uint32_t)];
149 int32_t size;
150 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000151
David Reiss89a12942010-10-06 17:10:52 +0000152 // if we've already received some bytes we kept them here
153 framing.size = readWant_;
154 // determine size of this frame
155 try {
156 // Read from the socket
157 fetch = tSocket_->read(&framing.buf[readBufferPos_],
158 uint32_t(sizeof(framing.size) - readBufferPos_));
159 if (fetch == 0) {
160 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000161 close();
162 return;
163 }
David Reiss89a12942010-10-06 17:10:52 +0000164 readBufferPos_ += fetch;
165 } catch (TTransportException& te) {
166 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
167 close();
168
169 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000170 }
171
David Reiss89a12942010-10-06 17:10:52 +0000172 if (readBufferPos_ < sizeof(framing.size)) {
173 // more needed before frame size is known -- save what we have so far
174 readWant_ = framing.size;
175 return;
176 }
177
178 readWant_ = ntohl(framing.size);
179 if (static_cast<int>(readWant_) <= 0) {
180 GlobalOutput.printf("TConnection:workSocket() Negative frame size %d, remote side not using TFramedTransport?", static_cast<int>(readWant_));
181 close();
182 return;
183 }
184 // size known; now get the rest of the frame
185 transition();
186 return;
187
188 case SOCKET_RECV:
189 // It is an error to be in this state if we already have all the data
190 assert(readBufferPos_ < readWant_);
191
David Reiss105961d2010-10-06 17:10:17 +0000192 try {
193 // Read from the socket
194 fetch = readWant_ - readBufferPos_;
195 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
196 }
197 catch (TTransportException& te) {
198 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
199 close();
Mark Slee79b16942007-11-26 19:05:29 +0000200
David Reiss105961d2010-10-06 17:10:17 +0000201 return;
202 }
203
Mark Slee2f6404d2006-10-10 01:37:40 +0000204 if (got > 0) {
205 // Move along in the buffer
206 readBufferPos_ += got;
207
208 // Check that we did not overdo it
209 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000210
Mark Slee2f6404d2006-10-10 01:37:40 +0000211 // We are done reading, move onto the next state
212 if (readBufferPos_ == readWant_) {
213 transition();
214 }
215 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000216 }
217
218 // Whenever we get down here it means a remote disconnect
219 close();
Mark Slee79b16942007-11-26 19:05:29 +0000220
Mark Slee2f6404d2006-10-10 01:37:40 +0000221 return;
222
223 case SOCKET_SEND:
224 // Should never have position past size
225 assert(writeBufferPos_ <= writeBufferSize_);
226
227 // If there is no data to send, then let us move on
228 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000229 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000230 transition();
231 return;
232 }
233
David Reiss105961d2010-10-06 17:10:17 +0000234 try {
235 left = writeBufferSize_ - writeBufferPos_;
236 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
237 }
238 catch (TTransportException& te) {
239 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000240 close();
241 return;
242 }
243
244 writeBufferPos_ += sent;
245
246 // Did we overdo it?
247 assert(writeBufferPos_ <= writeBufferSize_);
248
Mark Slee79b16942007-11-26 19:05:29 +0000249 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000250 if (writeBufferPos_ == writeBufferSize_) {
251 transition();
252 }
253
254 return;
255
256 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000257 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000258 assert(0);
259 }
260}
261
262/**
263 * This is called when the application transitions from one state into
264 * another. This means that it has finished writing the data that it needed
265 * to, or finished receiving the data that it needed to.
266 */
267void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000268
269 int sz = 0;
270
Mark Slee2f6404d2006-10-10 01:37:40 +0000271 // Switch upon the state that we are currently in and move to a new state
272 switch (appState_) {
273
274 case APP_READ_REQUEST:
275 // We are done reading the request, package the read buffer into transport
276 // and get back some data from the dispatch function
277 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000278 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000279 // Prepend four bytes of blank space to the buffer so we can
280 // write the frame size there later.
281 outputTransport_->getWritePtr(4);
282 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000283
David Reiss01fe1532010-03-09 05:19:25 +0000284 server_->incrementActiveProcessors();
285
Mark Sleee02385b2007-06-09 01:21:16 +0000286 if (server_->isThreadPoolProcessing()) {
287 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000288
David Reiss01fe1532010-03-09 05:19:25 +0000289 // Create task and dispatch to the thread manager
290 boost::shared_ptr<Runnable> task =
291 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
292 inputProtocol_,
293 outputProtocol_,
294 this));
295 // The application is now waiting on the task to finish
296 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000297
David Reisse11f3072008-10-07 21:39:19 +0000298 try {
299 server_->addTask(task);
300 } catch (IllegalStateException & ise) {
301 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000302 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000303 close();
304 }
Mark Slee402ee282007-08-23 01:43:20 +0000305
David Reiss01fe1532010-03-09 05:19:25 +0000306 // Set this connection idle so that libevent doesn't process more
307 // data on it while we're still waiting for the threadmanager to
308 // finish this task
309 setIdle();
310 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000311 } else {
312 try {
313 // Invoke the processor
David Reiss23248712010-10-06 17:10:08 +0000314 server_->getProcessor()->process(inputProtocol_, outputProtocol_, NULL);
Mark Sleee02385b2007-06-09 01:21:16 +0000315 } catch (TTransportException &ttx) {
David Reiss01e55c12008-07-13 22:18:51 +0000316 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000317 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000318 close();
319 return;
320 } catch (TException &x) {
David Reiss01e55c12008-07-13 22:18:51 +0000321 GlobalOutput.printf("TException: Server::process() %s", x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000322 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000323 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000324 return;
325 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000326 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000327 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000328 close();
329 return;
330 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000331 }
332
Mark Slee402ee282007-08-23 01:43:20 +0000333 // Intentionally fall through here, the call to process has written into
334 // the writeBuffer_
335
Mark Sleee02385b2007-06-09 01:21:16 +0000336 case APP_WAIT_TASK:
337 // We have now finished processing a task and the result has been written
338 // into the outputTransport_, so we grab its contents and place them into
339 // the writeBuffer_ for actual writing by the libevent thread
340
David Reiss01fe1532010-03-09 05:19:25 +0000341 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000342 // Get the result of the operation
343 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
344
345 // If the function call generated return data, then move into the send
346 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000347 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000348 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000349
350 // Move into write state
351 writeBufferPos_ = 0;
352 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000353
David Reissaf787782008-07-03 20:29:34 +0000354 // Put the frame size into the write buffer
355 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
356 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000357
358 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000359 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000360 setWrite();
361
362 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000363 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000364
365 return;
366 }
367
David Reissc51986f2009-03-24 20:01:25 +0000368 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000369 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000370 goto LABEL_APP_INIT;
371
Mark Slee2f6404d2006-10-10 01:37:40 +0000372 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000373 // it's now safe to perform buffer size housekeeping.
374 if (writeBufferSize_ > largestWriteBufferSize_) {
375 largestWriteBufferSize_ = writeBufferSize_;
376 }
377 if (server_->getResizeBufferEveryN() > 0
378 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
379 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
380 server_->getIdleWriteBufferLimit());
381 callsForResize_ = 0;
382 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000383
384 // N.B.: We also intentionally fall through here into the INIT state!
385
Mark Slee92f00fb2006-10-25 01:28:17 +0000386 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000387 case APP_INIT:
388
389 // Clear write buffer variables
390 writeBuffer_ = NULL;
391 writeBufferPos_ = 0;
392 writeBufferSize_ = 0;
393
Mark Slee2f6404d2006-10-10 01:37:40 +0000394 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000395 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000396 appState_ = APP_READ_FRAME_SIZE;
397
David Reiss89a12942010-10-06 17:10:52 +0000398 readBufferPos_ = 0;
399
Mark Slee2f6404d2006-10-10 01:37:40 +0000400 // Register read event
401 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000402
Mark Slee2f6404d2006-10-10 01:37:40 +0000403 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000404 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000405
406 return;
407
408 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000409 // We just read the request length
410 // Double the buffer size until it is big enough
411 if (readWant_ > readBufferSize_) {
412 if (readBufferSize_ == 0) {
413 readBufferSize_ = 1;
414 }
415 uint32_t newSize = readBufferSize_;
416 while (readWant_ > newSize) {
417 newSize *= 2;
418 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000419
David Reiss89a12942010-10-06 17:10:52 +0000420 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
421 if (newBuffer == NULL) {
422 // nothing else to be done...
423 throw std::bad_alloc();
424 }
425 readBuffer_ = newBuffer;
426 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000427 }
428
Mark Slee2f6404d2006-10-10 01:37:40 +0000429 readBufferPos_= 0;
430
431 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000432 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000433 appState_ = APP_READ_REQUEST;
434
435 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000436 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000437
438 return;
439
David Reiss01fe1532010-03-09 05:19:25 +0000440 case APP_CLOSE_CONNECTION:
441 server_->decrementActiveProcessors();
442 close();
443 return;
444
Mark Slee2f6404d2006-10-10 01:37:40 +0000445 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000446 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000447 assert(0);
448 }
449}
450
451void TConnection::setFlags(short eventFlags) {
452 // Catch the do nothing case
453 if (eventFlags_ == eventFlags) {
454 return;
455 }
456
457 // Delete a previously existing event
458 if (eventFlags_ != 0) {
459 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000460 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000461 return;
462 }
463 }
464
465 // Update in memory structure
466 eventFlags_ = eventFlags;
467
Mark Slee402ee282007-08-23 01:43:20 +0000468 // Do not call event_set if there are no flags
469 if (!eventFlags_) {
470 return;
471 }
472
David Reiss01fe1532010-03-09 05:19:25 +0000473 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000474 * event_set:
475 *
476 * Prepares the event structure &event to be used in future calls to
477 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000478 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000479 *
480 * The events can be either EV_READ, EV_WRITE, or both, indicating
481 * that an application can read or write from the file respectively without
482 * blocking.
483 *
Mark Sleee02385b2007-06-09 01:21:16 +0000484 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000485 * the event and the type of event which will be one of: EV_TIMEOUT,
486 * EV_SIGNAL, EV_READ, EV_WRITE.
487 *
488 * The additional flag EV_PERSIST makes an event_add() persistent until
489 * event_del() has been called.
490 *
491 * Once initialized, the &event struct can be used repeatedly with
492 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000493 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000494 * when an ev structure has been added to libevent using event_add() the
495 * structure must persist until the event occurs (assuming EV_PERSIST
496 * is not set) or is removed using event_del(). You may not reuse the same
497 * ev structure for multiple monitored descriptors; each descriptor needs
498 * its own ev.
499 */
David Reiss105961d2010-10-06 17:10:17 +0000500 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
501 TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000502 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000503
504 // Add the event
505 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000506 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000507 }
508}
509
510/**
511 * Closes a connection
512 */
513void TConnection::close() {
514 // Delete the registered libevent
515 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000516 GlobalOutput.perror("TConnection::close() event_del", errno);
517 }
518
519 if (serverEventHandler_ != NULL) {
520 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000521 }
522
523 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000524 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000525
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000526 // close any factory produced transports
527 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000528 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000529
Mark Slee2f6404d2006-10-10 01:37:40 +0000530 // Give this object back to the server that owns it
531 server_->returnConnection(this);
532}
533
David Reiss54bec5d2010-10-06 17:10:45 +0000534void TConnection::checkIdleBufferMemLimit(size_t readLimit,
535 size_t writeLimit) {
536 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000537 free(readBuffer_);
538 readBuffer_ = NULL;
539 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000540 }
David Reiss54bec5d2010-10-06 17:10:45 +0000541
542 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
543 // just start over
David Reiss89a12942010-10-06 17:10:52 +0000544 outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
David Reiss54bec5d2010-10-06 17:10:45 +0000545 largestWriteBufferSize_ = 0;
546 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000547}
548
David Reiss8ede8182010-09-02 15:26:28 +0000549TNonblockingServer::~TNonblockingServer() {
550 // TODO: We currently leak any active TConnection objects.
551 // Since we're shutting down and destroying the event_base, the TConnection
552 // objects will never receive any additional callbacks. (And even if they
553 // did, it would be bad, since they keep a pointer around to the server,
554 // which is being destroyed.)
555
556 // Clean up unused TConnection objects in connectionStack_
557 while (!connectionStack_.empty()) {
558 TConnection* connection = connectionStack_.top();
559 connectionStack_.pop();
560 delete connection;
561 }
562
563 if (eventBase_) {
564 event_base_free(eventBase_);
565 }
566
567 if (serverSocket_ >= 0) {
568 close(serverSocket_);
569 }
570}
571
Mark Slee2f6404d2006-10-10 01:37:40 +0000572/**
573 * Creates a new connection either by reusing an object off the stack or
574 * by allocating a new one entirely
575 */
David Reiss105961d2010-10-06 17:10:17 +0000576TConnection* TNonblockingServer::createConnection(int socket, short flags,
577 const sockaddr* addr,
578 socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000579 // Check the stack
580 if (connectionStack_.empty()) {
David Reiss105961d2010-10-06 17:10:17 +0000581 return new TConnection(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000582 } else {
583 TConnection* result = connectionStack_.top();
584 connectionStack_.pop();
David Reiss105961d2010-10-06 17:10:17 +0000585 result->init(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000586 return result;
587 }
588}
589
590/**
591 * Returns a connection to the stack
592 */
593void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000594 if (connectionStackLimit_ &&
595 (connectionStack_.size() >= connectionStackLimit_)) {
596 delete connection;
597 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000598 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000599 connectionStack_.push(connection);
600 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000601}
602
603/**
David Reissa79e4882008-03-05 07:51:47 +0000604 * Server socket had something happen. We accept all waiting client
605 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000606 */
607void TNonblockingServer::handleEvent(int fd, short which) {
David Reiss3bb5e052010-01-25 19:31:31 +0000608 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000609 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000610
Mark Slee2f6404d2006-10-10 01:37:40 +0000611 // Server socket accepted a new connection
612 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000613 sockaddr_storage addrStorage;
614 sockaddr* addrp = (sockaddr*)&addrStorage;
615 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000616
Mark Slee2f6404d2006-10-10 01:37:40 +0000617 // Going to accept a new client socket
618 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000619
Mark Slee2f6404d2006-10-10 01:37:40 +0000620 // Accept as many new clients as possible, even though libevent signaled only
621 // one, this helps us to avoid having to go back into the libevent engine so
622 // many times
David Reiss105961d2010-10-06 17:10:17 +0000623 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000624 // If we're overloaded, take action here
625 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
626 nConnectionsDropped_++;
627 nTotalConnectionsDropped_++;
628 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
629 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000630 return;
David Reiss01fe1532010-03-09 05:19:25 +0000631 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
632 if (!drainPendingTask()) {
633 // Nothing left to discard, so we drop connection instead.
634 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000635 return;
David Reiss01fe1532010-03-09 05:19:25 +0000636 }
637 }
638 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000639 // Explicitly set this socket to NONBLOCK mode
640 int flags;
641 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
642 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000643 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000644 close(clientSocket);
645 return;
646 }
647
648 // Create a new TConnection for this client socket.
649 TConnection* clientConnection =
David Reiss105961d2010-10-06 17:10:17 +0000650 createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000651
652 // Fail fast if we could not create a TConnection object
653 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000654 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000655 close(clientSocket);
656 return;
657 }
658
659 // Put this client connection into the proper state
660 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000661
662 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000663 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000664 }
Mark Slee79b16942007-11-26 19:05:29 +0000665
Mark Slee2f6404d2006-10-10 01:37:40 +0000666 // Done looping accept, now we have to make sure the error is due to
667 // blocking. Any other error is a problem
668 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000669 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000670 }
671}
672
673/**
Mark Slee79b16942007-11-26 19:05:29 +0000674 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000675 */
Mark Slee79b16942007-11-26 19:05:29 +0000676void TNonblockingServer::listenSocket() {
677 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000678 struct addrinfo hints, *res, *res0;
679 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000680
Mark Sleefb4b5142007-11-20 01:27:08 +0000681 char port[sizeof("65536") + 1];
682 memset(&hints, 0, sizeof(hints));
683 hints.ai_family = PF_UNSPEC;
684 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000685 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000686 sprintf(port, "%d", port_);
687
688 // Wildcard address
689 error = getaddrinfo(NULL, port, &hints, &res0);
690 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000691 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
692 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000693 return;
694 }
695
696 // Pick the ipv6 address first since ipv4 addresses can be mapped
697 // into ipv6 space.
698 for (res = res0; res; res = res->ai_next) {
699 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
700 break;
701 }
702
Mark Slee2f6404d2006-10-10 01:37:40 +0000703 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000704 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
705 if (s == -1) {
706 freeaddrinfo(res0);
707 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000708 }
709
David Reiss13aea462008-06-10 22:56:04 +0000710 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +0000711 if (res->ai_family == AF_INET6) {
712 int zero = 0;
713 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
714 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
715 }
David Reiss13aea462008-06-10 22:56:04 +0000716 }
717 #endif // #ifdef IPV6_V6ONLY
718
719
Mark Slee79b16942007-11-26 19:05:29 +0000720 int one = 1;
721
722 // Set reuseaddr to avoid 2MSL delay on server restart
723 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
724
725 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
726 close(s);
727 freeaddrinfo(res0);
728 throw TException("TNonblockingServer::serve() bind");
729 }
730
731 // Done with the addr info
732 freeaddrinfo(res0);
733
734 // Set up this file descriptor for listening
735 listenSocket(s);
736}
737
738/**
739 * Takes a socket created by listenSocket() and sets various options on it
740 * to prepare for use in the server.
741 */
742void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000743 // Set socket to nonblocking mode
744 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000745 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
746 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
747 close(s);
748 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000749 }
750
751 int one = 1;
752 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000753
754 // Keepalive to ensure full result flushing
Mark Slee79b16942007-11-26 19:05:29 +0000755 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000756
757 // Turn linger off to avoid hung sockets
Mark Slee79b16942007-11-26 19:05:29 +0000758 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000759
760 // Set TCP nodelay if available, MAC OS X Hack
761 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
762 #ifndef TCP_NOPUSH
Mark Slee79b16942007-11-26 19:05:29 +0000763 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000764 #endif
765
David Reiss1c20c872010-03-09 05:20:14 +0000766 #ifdef TCP_LOW_MIN_RTO
767 if (TSocket::getUseLowMinRto()) {
768 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
769 }
770 #endif
771
Mark Slee79b16942007-11-26 19:05:29 +0000772 if (listen(s, LISTEN_BACKLOG) == -1) {
773 close(s);
774 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000775 }
776
Mark Slee79b16942007-11-26 19:05:29 +0000777 // Cool, this socket is good to go, set it as the serverSocket_
778 serverSocket_ = s;
779}
780
David Reiss01fe1532010-03-09 05:19:25 +0000781void TNonblockingServer::createNotificationPipe() {
782 if (pipe(notificationPipeFDs_) != 0) {
783 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
784 throw TException("can't create notification pipe");
785 }
David Reiss83b8fda2010-03-09 05:19:34 +0000786 int flags;
787 if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
788 fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
789 close(notificationPipeFDs_[0]);
790 close(notificationPipeFDs_[1]);
791 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
792 }
David Reiss01fe1532010-03-09 05:19:25 +0000793}
794
Mark Slee79b16942007-11-26 19:05:29 +0000795/**
796 * Register the core libevent events onto the proper base.
797 */
798void TNonblockingServer::registerEvents(event_base* base) {
799 assert(serverSocket_ != -1);
800 assert(!eventBase_);
801 eventBase_ = base;
802
803 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000804 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000805 event_get_version(),
806 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000807
808 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000809 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000810 serverSocket_,
811 EV_READ | EV_PERSIST,
812 TNonblockingServer::eventHandler,
813 this);
Mark Slee79b16942007-11-26 19:05:29 +0000814 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000815
816 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000817 if (-1 == event_add(&serverEvent_, 0)) {
818 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000819 }
David Reiss01fe1532010-03-09 05:19:25 +0000820 if (threadPoolProcessing_) {
821 // Create an event to be notified when a task finishes
822 event_set(&notificationEvent_,
823 getNotificationRecvFD(),
824 EV_READ | EV_PERSIST,
825 TConnection::taskHandler,
826 this);
David Reiss1c20c872010-03-09 05:20:14 +0000827
David Reiss01fe1532010-03-09 05:19:25 +0000828 // Attach to the base
829 event_base_set(eventBase_, &notificationEvent_);
830
831 // Add the event and start up the server
832 if (-1 == event_add(&notificationEvent_, 0)) {
833 throw TException("TNonblockingServer::serve(): notification event_add fail");
834 }
835 }
836}
837
David Reiss068f4162010-03-09 05:19:45 +0000838void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
839 threadManager_ = threadManager;
840 if (threadManager != NULL) {
841 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
842 threadPoolProcessing_ = true;
843 } else {
844 threadPoolProcessing_ = false;
845 }
846}
847
David Reiss01fe1532010-03-09 05:19:25 +0000848bool TNonblockingServer::serverOverloaded() {
849 size_t activeConnections = numTConnections_ - connectionStack_.size();
850 if (numActiveProcessors_ > maxActiveProcessors_ ||
851 activeConnections > maxConnections_) {
852 if (!overloaded_) {
853 GlobalOutput.printf("thrift non-blocking server overload condition");
854 overloaded_ = true;
855 }
856 } else {
857 if (overloaded_ &&
858 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
859 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
860 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
861 nConnectionsDropped_, nTotalConnectionsDropped_);
862 nConnectionsDropped_ = 0;
863 overloaded_ = false;
864 }
865 }
866
867 return overloaded_;
868}
869
870bool TNonblockingServer::drainPendingTask() {
871 if (threadManager_) {
872 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
873 if (task) {
874 TConnection* connection =
875 static_cast<TConnection::Task*>(task.get())->getTConnection();
876 assert(connection && connection->getServer()
877 && connection->getState() == APP_WAIT_TASK);
878 connection->forceClose();
879 return true;
880 }
881 }
882 return false;
Mark Slee79b16942007-11-26 19:05:29 +0000883}
884
David Reiss068f4162010-03-09 05:19:45 +0000885void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
886 TConnection* connection =
887 static_cast<TConnection::Task*>(task.get())->getTConnection();
888 assert(connection && connection->getServer()
889 && connection->getState() == APP_WAIT_TASK);
890 connection->forceClose();
891}
892
Mark Slee79b16942007-11-26 19:05:29 +0000893/**
894 * Main workhorse function, starts up the server listening on a port and
895 * loops over the libevent handler.
896 */
897void TNonblockingServer::serve() {
898 // Init socket
899 listenSocket();
900
David Reiss01fe1532010-03-09 05:19:25 +0000901 if (threadPoolProcessing_) {
902 // Init task completion notification pipe
903 createNotificationPipe();
904 }
905
Mark Slee79b16942007-11-26 19:05:29 +0000906 // Initialize libevent core
907 registerEvents(static_cast<event_base*>(event_init()));
Mark Slee2f6404d2006-10-10 01:37:40 +0000908
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000909 // Run the preServe event
910 if (eventHandler_ != NULL) {
911 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000912 }
913
Mark Sleee02385b2007-06-09 01:21:16 +0000914 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000915 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000916}
917
T Jake Lucianib5e62212009-01-31 22:36:20 +0000918}}} // apache::thrift::server