blob: a9c1c159b3f7e97911c487a96875df75d85a7ffe [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000022
Mark Sleee02385b2007-06-09 01:21:16 +000023#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000024#include <sys/socket.h>
25#include <netinet/in.h>
26#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000027#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000028#include <fcntl.h>
29#include <errno.h>
30#include <assert.h>
31
David Reiss9b903442009-10-21 05:51:28 +000032#ifndef AF_LOCAL
33#define AF_LOCAL AF_UNIX
34#endif
35
T Jake Lucianib5e62212009-01-31 22:36:20 +000036namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000037
T Jake Lucianib5e62212009-01-31 22:36:20 +000038using namespace apache::thrift::protocol;
39using namespace apache::thrift::transport;
40using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000041using namespace std;
42
43class TConnection::Task: public Runnable {
44 public:
45 Task(boost::shared_ptr<TProcessor> processor,
46 boost::shared_ptr<TProtocol> input,
47 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +000048 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +000049 processor_(processor),
50 input_(input),
51 output_(output),
David Reiss01fe1532010-03-09 05:19:25 +000052 connection_(connection) {}
Mark Sleee02385b2007-06-09 01:21:16 +000053
54 void run() {
55 try {
56 while (processor_->process(input_, output_)) {
57 if (!input_->getTransport()->peek()) {
58 break;
59 }
60 }
61 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000062 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000063 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000064 cerr << "TNonblockingServer exception: " << x.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000065 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000066 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000067 }
Mark Slee79b16942007-11-26 19:05:29 +000068
David Reiss01fe1532010-03-09 05:19:25 +000069 // Signal completion back to the libevent thread via a pipe
70 if (!connection_->notifyServer()) {
71 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +000072 }
David Reiss01fe1532010-03-09 05:19:25 +000073 }
74
75 TConnection* getTConnection() {
76 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +000077 }
78
79 private:
80 boost::shared_ptr<TProcessor> processor_;
81 boost::shared_ptr<TProtocol> input_;
82 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +000083 TConnection* connection_;
Mark Sleee02385b2007-06-09 01:21:16 +000084};
Mark Slee5ea15f92007-03-05 22:55:59 +000085
86void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000087 socket_ = socket;
88 server_ = s;
89 appState_ = APP_INIT;
90 eventFlags_ = 0;
91
92 readBufferPos_ = 0;
93 readWant_ = 0;
94
95 writeBuffer_ = NULL;
96 writeBufferSize_ = 0;
97 writeBufferPos_ = 0;
98
99 socketState_ = SOCKET_RECV;
100 appState_ = APP_INIT;
Mark Slee79b16942007-11-26 19:05:29 +0000101
Mark Slee2f6404d2006-10-10 01:37:40 +0000102 // Set flags, which also registers the event
103 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000104
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000105 // get input/transports
106 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
107 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000108
109 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000110 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
111 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000112}
113
114void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000115 int flags=0, got=0, left=0, sent=0;
116 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000117
118 switch (socketState_) {
119 case SOCKET_RECV:
120 // It is an error to be in this state if we already have all the data
121 assert(readBufferPos_ < readWant_);
122
Mark Slee2f6404d2006-10-10 01:37:40 +0000123 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000124 if (readWant_ > readBufferSize_) {
125 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000126 readBufferSize_ *= 2;
127 }
David Reissd7a16f42008-02-19 22:47:29 +0000128 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000129 if (readBuffer_ == NULL) {
boz6ded7752007-06-05 22:41:18 +0000130 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000131 close();
132 return;
133 }
134 }
135
136 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000137 fetch = readWant_ - readBufferPos_;
138 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee79b16942007-11-26 19:05:29 +0000139
Mark Slee2f6404d2006-10-10 01:37:40 +0000140 if (got > 0) {
141 // Move along in the buffer
142 readBufferPos_ += got;
143
144 // Check that we did not overdo it
145 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000146
Mark Slee2f6404d2006-10-10 01:37:40 +0000147 // We are done reading, move onto the next state
148 if (readBufferPos_ == readWant_) {
149 transition();
150 }
151 return;
152 } else if (got == -1) {
153 // Blocking errors are okay, just move on
154 if (errno == EAGAIN || errno == EWOULDBLOCK) {
155 return;
156 }
157
158 if (errno != ECONNRESET) {
David Reiss01e55c12008-07-13 22:18:51 +0000159 GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000160 }
161 }
162
163 // Whenever we get down here it means a remote disconnect
164 close();
Mark Slee79b16942007-11-26 19:05:29 +0000165
Mark Slee2f6404d2006-10-10 01:37:40 +0000166 return;
167
168 case SOCKET_SEND:
169 // Should never have position past size
170 assert(writeBufferPos_ <= writeBufferSize_);
171
172 // If there is no data to send, then let us move on
173 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000174 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000175 transition();
176 return;
177 }
178
179 flags = 0;
180 #ifdef MSG_NOSIGNAL
181 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
182 // check for the EPIPE return condition and close the socket in that case
183 flags |= MSG_NOSIGNAL;
184 #endif // ifdef MSG_NOSIGNAL
185
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000186 left = writeBufferSize_ - writeBufferPos_;
187 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000188
189 if (sent <= 0) {
190 // Blocking errors are okay, just move on
191 if (errno == EAGAIN || errno == EWOULDBLOCK) {
192 return;
193 }
194 if (errno != EPIPE) {
David Reiss01e55c12008-07-13 22:18:51 +0000195 GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000196 }
197 close();
198 return;
199 }
200
201 writeBufferPos_ += sent;
202
203 // Did we overdo it?
204 assert(writeBufferPos_ <= writeBufferSize_);
205
Mark Slee79b16942007-11-26 19:05:29 +0000206 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000207 if (writeBufferPos_ == writeBufferSize_) {
208 transition();
209 }
210
211 return;
212
213 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000214 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000215 assert(0);
216 }
217}
218
219/**
220 * This is called when the application transitions from one state into
221 * another. This means that it has finished writing the data that it needed
222 * to, or finished receiving the data that it needed to.
223 */
224void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000225
226 int sz = 0;
227
Mark Slee2f6404d2006-10-10 01:37:40 +0000228 // Switch upon the state that we are currently in and move to a new state
229 switch (appState_) {
230
231 case APP_READ_REQUEST:
232 // We are done reading the request, package the read buffer into transport
233 // and get back some data from the dispatch function
Kevin Clark5ace1782009-03-04 21:10:58 +0000234 // If we've used these transport buffers enough times, reset them to avoid bloating
235
Mark Slee2f6404d2006-10-10 01:37:40 +0000236 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
Kevin Clark5ace1782009-03-04 21:10:58 +0000237 ++numReadsSinceReset_;
238 if (numWritesSinceReset_ < 512) {
239 outputTransport_->resetBuffer();
240 } else {
241 // reset the capacity of the output transport if we used it enough times that it might be bloated
242 try {
243 outputTransport_->resetBuffer(true);
244 numWritesSinceReset_ = 0;
245 } catch (TTransportException &ttx) {
246 GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
247 close();
248 return;
249 }
250 }
251
David Reiss52cb7a72008-06-30 21:40:35 +0000252 // Prepend four bytes of blank space to the buffer so we can
253 // write the frame size there later.
254 outputTransport_->getWritePtr(4);
255 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000256
David Reiss01fe1532010-03-09 05:19:25 +0000257 server_->incrementActiveProcessors();
258
Mark Sleee02385b2007-06-09 01:21:16 +0000259 if (server_->isThreadPoolProcessing()) {
260 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000261
David Reiss01fe1532010-03-09 05:19:25 +0000262 // Create task and dispatch to the thread manager
263 boost::shared_ptr<Runnable> task =
264 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
265 inputProtocol_,
266 outputProtocol_,
267 this));
268 // The application is now waiting on the task to finish
269 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000270
David Reisse11f3072008-10-07 21:39:19 +0000271 try {
272 server_->addTask(task);
273 } catch (IllegalStateException & ise) {
274 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000275 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000276 close();
277 }
Mark Slee402ee282007-08-23 01:43:20 +0000278
David Reiss01fe1532010-03-09 05:19:25 +0000279 // Set this connection idle so that libevent doesn't process more
280 // data on it while we're still waiting for the threadmanager to
281 // finish this task
282 setIdle();
283 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000284 } else {
285 try {
286 // Invoke the processor
287 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
288 } catch (TTransportException &ttx) {
David Reiss01e55c12008-07-13 22:18:51 +0000289 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000290 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000291 close();
292 return;
293 } catch (TException &x) {
David Reiss01e55c12008-07-13 22:18:51 +0000294 GlobalOutput.printf("TException: Server::process() %s", x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000295 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000296 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000297 return;
298 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000299 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000300 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000301 close();
302 return;
303 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000304 }
305
Mark Slee402ee282007-08-23 01:43:20 +0000306 // Intentionally fall through here, the call to process has written into
307 // the writeBuffer_
308
Mark Sleee02385b2007-06-09 01:21:16 +0000309 case APP_WAIT_TASK:
310 // We have now finished processing a task and the result has been written
311 // into the outputTransport_, so we grab its contents and place them into
312 // the writeBuffer_ for actual writing by the libevent thread
313
David Reiss01fe1532010-03-09 05:19:25 +0000314 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000315 // Get the result of the operation
316 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
317
318 // If the function call generated return data, then move into the send
319 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000320 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000321 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000322
323 // Move into write state
324 writeBufferPos_ = 0;
325 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000326
David Reissaf787782008-07-03 20:29:34 +0000327 // Put the frame size into the write buffer
328 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
329 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000330
331 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000332 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000333 setWrite();
334
335 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000336 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000337
338 return;
339 }
340
David Reissc51986f2009-03-24 20:01:25 +0000341 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000342 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000343 goto LABEL_APP_INIT;
344
Mark Slee2f6404d2006-10-10 01:37:40 +0000345 case APP_SEND_RESULT:
346
Kevin Clark5ace1782009-03-04 21:10:58 +0000347 ++numWritesSinceReset_;
348
Mark Slee2f6404d2006-10-10 01:37:40 +0000349 // N.B.: We also intentionally fall through here into the INIT state!
350
Mark Slee92f00fb2006-10-25 01:28:17 +0000351 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000352 case APP_INIT:
353
Kevin Clark5ace1782009-03-04 21:10:58 +0000354 // reset the input buffer if we used it enough times that it might be bloated
355 if (numReadsSinceReset_ > 512)
356 {
357 void * new_buffer = std::realloc(readBuffer_, 1024);
358 if (new_buffer == NULL) {
359 GlobalOutput("TConnection::transition() realloc");
360 close();
361 return;
362 }
363 readBuffer_ = (uint8_t*) new_buffer;
364 readBufferSize_ = 1024;
365 numReadsSinceReset_ = 0;
366 }
367
Mark Slee2f6404d2006-10-10 01:37:40 +0000368 // Clear write buffer variables
369 writeBuffer_ = NULL;
370 writeBufferPos_ = 0;
371 writeBufferSize_ = 0;
372
373 // Set up read buffer for getting 4 bytes
374 readBufferPos_ = 0;
375 readWant_ = 4;
376
377 // Into read4 state we go
378 socketState_ = SOCKET_RECV;
379 appState_ = APP_READ_FRAME_SIZE;
380
381 // Register read event
382 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000383
Mark Slee2f6404d2006-10-10 01:37:40 +0000384 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000385 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000386
387 return;
388
389 case APP_READ_FRAME_SIZE:
390 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000391 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000392 sz = (int32_t)ntohl(sz);
393
394 if (sz <= 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000395 GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
Mark Slee2f6404d2006-10-10 01:37:40 +0000396 close();
397 return;
398 }
399
400 // Reset the read buffer
401 readWant_ = (uint32_t)sz;
402 readBufferPos_= 0;
403
404 // Move into read request state
405 appState_ = APP_READ_REQUEST;
406
407 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000408 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000409
410 return;
411
David Reiss01fe1532010-03-09 05:19:25 +0000412 case APP_CLOSE_CONNECTION:
413 server_->decrementActiveProcessors();
414 close();
415 return;
416
Mark Slee2f6404d2006-10-10 01:37:40 +0000417 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000418 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000419 assert(0);
420 }
421}
422
423void TConnection::setFlags(short eventFlags) {
424 // Catch the do nothing case
425 if (eventFlags_ == eventFlags) {
426 return;
427 }
428
429 // Delete a previously existing event
430 if (eventFlags_ != 0) {
431 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000432 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000433 return;
434 }
435 }
436
437 // Update in memory structure
438 eventFlags_ = eventFlags;
439
Mark Slee402ee282007-08-23 01:43:20 +0000440 // Do not call event_set if there are no flags
441 if (!eventFlags_) {
442 return;
443 }
444
David Reiss01fe1532010-03-09 05:19:25 +0000445 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000446 * event_set:
447 *
448 * Prepares the event structure &event to be used in future calls to
449 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000450 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000451 *
452 * The events can be either EV_READ, EV_WRITE, or both, indicating
453 * that an application can read or write from the file respectively without
454 * blocking.
455 *
Mark Sleee02385b2007-06-09 01:21:16 +0000456 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000457 * the event and the type of event which will be one of: EV_TIMEOUT,
458 * EV_SIGNAL, EV_READ, EV_WRITE.
459 *
460 * The additional flag EV_PERSIST makes an event_add() persistent until
461 * event_del() has been called.
462 *
463 * Once initialized, the &event struct can be used repeatedly with
464 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000465 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000466 * when an ev structure has been added to libevent using event_add() the
467 * structure must persist until the event occurs (assuming EV_PERSIST
468 * is not set) or is removed using event_del(). You may not reuse the same
469 * ev structure for multiple monitored descriptors; each descriptor needs
470 * its own ev.
471 */
472 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000473 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000474
475 // Add the event
476 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000477 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000478 }
479}
480
481/**
482 * Closes a connection
483 */
484void TConnection::close() {
485 // Delete the registered libevent
486 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000487 GlobalOutput("TConnection::close() event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000488 }
489
490 // Close the socket
David Reiss01fe1532010-03-09 05:19:25 +0000491 if (socket_ >= 0) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000492 ::close(socket_);
493 }
David Reiss01fe1532010-03-09 05:19:25 +0000494 socket_ = -1;
Mark Slee2f6404d2006-10-10 01:37:40 +0000495
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000496 // close any factory produced transports
497 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000498 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000499
Mark Slee2f6404d2006-10-10 01:37:40 +0000500 // Give this object back to the server that owns it
501 server_->returnConnection(this);
502}
503
David Reiss01fe1532010-03-09 05:19:25 +0000504void TConnection::checkIdleBufferMemLimit(size_t limit) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000505 if (readBufferSize_ > limit) {
506 readBufferSize_ = limit;
507 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
508 if (readBuffer_ == NULL) {
509 GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
510 close();
511 }
512 }
513}
514
Mark Slee2f6404d2006-10-10 01:37:40 +0000515/**
516 * Creates a new connection either by reusing an object off the stack or
517 * by allocating a new one entirely
518 */
519TConnection* TNonblockingServer::createConnection(int socket, short flags) {
520 // Check the stack
521 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000522 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000523 } else {
524 TConnection* result = connectionStack_.top();
525 connectionStack_.pop();
526 result->init(socket, flags, this);
527 return result;
528 }
529}
530
531/**
532 * Returns a connection to the stack
533 */
534void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000535 if (connectionStackLimit_ &&
536 (connectionStack_.size() >= connectionStackLimit_)) {
537 delete connection;
538 } else {
539 connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
540 connectionStack_.push(connection);
541 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000542}
543
544/**
David Reissa79e4882008-03-05 07:51:47 +0000545 * Server socket had something happen. We accept all waiting client
546 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000547 */
548void TNonblockingServer::handleEvent(int fd, short which) {
David Reiss3bb5e052010-01-25 19:31:31 +0000549 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000550 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000551
Mark Slee2f6404d2006-10-10 01:37:40 +0000552 // Server socket accepted a new connection
553 socklen_t addrLen;
554 struct sockaddr addr;
Mark Slee79b16942007-11-26 19:05:29 +0000555 addrLen = sizeof(addr);
556
Mark Slee2f6404d2006-10-10 01:37:40 +0000557 // Going to accept a new client socket
558 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000559
Mark Slee2f6404d2006-10-10 01:37:40 +0000560 // Accept as many new clients as possible, even though libevent signaled only
561 // one, this helps us to avoid having to go back into the libevent engine so
562 // many times
563 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000564 // If we're overloaded, take action here
565 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
566 nConnectionsDropped_++;
567 nTotalConnectionsDropped_++;
568 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
569 close(clientSocket);
570 continue;
571 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
572 if (!drainPendingTask()) {
573 // Nothing left to discard, so we drop connection instead.
574 close(clientSocket);
575 continue;
576 }
577 }
578 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000579 // Explicitly set this socket to NONBLOCK mode
580 int flags;
581 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
582 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000583 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000584 close(clientSocket);
585 return;
586 }
587
588 // Create a new TConnection for this client socket.
589 TConnection* clientConnection =
590 createConnection(clientSocket, EV_READ | EV_PERSIST);
591
592 // Fail fast if we could not create a TConnection object
593 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000594 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000595 close(clientSocket);
596 return;
597 }
598
599 // Put this client connection into the proper state
600 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000601
602 // addrLen is written by the accept() call, so needs to be set before the next call.
603 addrLen = sizeof(addr);
Mark Slee2f6404d2006-10-10 01:37:40 +0000604 }
Mark Slee79b16942007-11-26 19:05:29 +0000605
Mark Slee2f6404d2006-10-10 01:37:40 +0000606 // Done looping accept, now we have to make sure the error is due to
607 // blocking. Any other error is a problem
608 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000609 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000610 }
611}
612
613/**
Mark Slee79b16942007-11-26 19:05:29 +0000614 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000615 */
Mark Slee79b16942007-11-26 19:05:29 +0000616void TNonblockingServer::listenSocket() {
617 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000618 struct addrinfo hints, *res, *res0;
619 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000620
Mark Sleefb4b5142007-11-20 01:27:08 +0000621 char port[sizeof("65536") + 1];
622 memset(&hints, 0, sizeof(hints));
623 hints.ai_family = PF_UNSPEC;
624 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000625 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000626 sprintf(port, "%d", port_);
627
628 // Wildcard address
629 error = getaddrinfo(NULL, port, &hints, &res0);
630 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000631 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
632 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000633 return;
634 }
635
636 // Pick the ipv6 address first since ipv4 addresses can be mapped
637 // into ipv6 space.
638 for (res = res0; res; res = res->ai_next) {
639 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
640 break;
641 }
642
Mark Slee2f6404d2006-10-10 01:37:40 +0000643 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000644 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
645 if (s == -1) {
646 freeaddrinfo(res0);
647 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000648 }
649
David Reiss13aea462008-06-10 22:56:04 +0000650 #ifdef IPV6_V6ONLY
651 int zero = 0;
652 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
653 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
654 }
655 #endif // #ifdef IPV6_V6ONLY
656
657
Mark Slee79b16942007-11-26 19:05:29 +0000658 int one = 1;
659
660 // Set reuseaddr to avoid 2MSL delay on server restart
661 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
662
663 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
664 close(s);
665 freeaddrinfo(res0);
666 throw TException("TNonblockingServer::serve() bind");
667 }
668
669 // Done with the addr info
670 freeaddrinfo(res0);
671
672 // Set up this file descriptor for listening
673 listenSocket(s);
674}
675
676/**
677 * Takes a socket created by listenSocket() and sets various options on it
678 * to prepare for use in the server.
679 */
680void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000681 // Set socket to nonblocking mode
682 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000683 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
684 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
685 close(s);
686 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000687 }
688
689 int one = 1;
690 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000691
692 // Keepalive to ensure full result flushing
Mark Slee79b16942007-11-26 19:05:29 +0000693 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000694
695 // Turn linger off to avoid hung sockets
Mark Slee79b16942007-11-26 19:05:29 +0000696 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000697
698 // Set TCP nodelay if available, MAC OS X Hack
699 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
700 #ifndef TCP_NOPUSH
Mark Slee79b16942007-11-26 19:05:29 +0000701 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000702 #endif
703
Mark Slee79b16942007-11-26 19:05:29 +0000704 if (listen(s, LISTEN_BACKLOG) == -1) {
705 close(s);
706 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000707 }
708
Mark Slee79b16942007-11-26 19:05:29 +0000709 // Cool, this socket is good to go, set it as the serverSocket_
710 serverSocket_ = s;
711}
712
David Reiss01fe1532010-03-09 05:19:25 +0000713void TNonblockingServer::createNotificationPipe() {
714 if (pipe(notificationPipeFDs_) != 0) {
715 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
716 throw TException("can't create notification pipe");
717 }
718}
719
Mark Slee79b16942007-11-26 19:05:29 +0000720/**
721 * Register the core libevent events onto the proper base.
722 */
723void TNonblockingServer::registerEvents(event_base* base) {
724 assert(serverSocket_ != -1);
725 assert(!eventBase_);
726 eventBase_ = base;
727
728 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000729 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000730 event_get_version(),
731 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000732
733 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000734 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000735 serverSocket_,
736 EV_READ | EV_PERSIST,
737 TNonblockingServer::eventHandler,
738 this);
Mark Slee79b16942007-11-26 19:05:29 +0000739 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000740
741 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000742 if (-1 == event_add(&serverEvent_, 0)) {
743 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000744 }
David Reiss01fe1532010-03-09 05:19:25 +0000745 if (threadPoolProcessing_) {
746 // Create an event to be notified when a task finishes
747 event_set(&notificationEvent_,
748 getNotificationRecvFD(),
749 EV_READ | EV_PERSIST,
750 TConnection::taskHandler,
751 this);
752
753 // Attach to the base
754 event_base_set(eventBase_, &notificationEvent_);
755
756 // Add the event and start up the server
757 if (-1 == event_add(&notificationEvent_, 0)) {
758 throw TException("TNonblockingServer::serve(): notification event_add fail");
759 }
760 }
761}
762
763bool TNonblockingServer::serverOverloaded() {
764 size_t activeConnections = numTConnections_ - connectionStack_.size();
765 if (numActiveProcessors_ > maxActiveProcessors_ ||
766 activeConnections > maxConnections_) {
767 if (!overloaded_) {
768 GlobalOutput.printf("thrift non-blocking server overload condition");
769 overloaded_ = true;
770 }
771 } else {
772 if (overloaded_ &&
773 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
774 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
775 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
776 nConnectionsDropped_, nTotalConnectionsDropped_);
777 nConnectionsDropped_ = 0;
778 overloaded_ = false;
779 }
780 }
781
782 return overloaded_;
783}
784
785bool TNonblockingServer::drainPendingTask() {
786 if (threadManager_) {
787 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
788 if (task) {
789 TConnection* connection =
790 static_cast<TConnection::Task*>(task.get())->getTConnection();
791 assert(connection && connection->getServer()
792 && connection->getState() == APP_WAIT_TASK);
793 connection->forceClose();
794 return true;
795 }
796 }
797 return false;
Mark Slee79b16942007-11-26 19:05:29 +0000798}
799
800/**
801 * Main workhorse function, starts up the server listening on a port and
802 * loops over the libevent handler.
803 */
804void TNonblockingServer::serve() {
805 // Init socket
806 listenSocket();
807
David Reiss01fe1532010-03-09 05:19:25 +0000808 if (threadPoolProcessing_) {
809 // Init task completion notification pipe
810 createNotificationPipe();
811 }
812
Mark Slee79b16942007-11-26 19:05:29 +0000813 // Initialize libevent core
814 registerEvents(static_cast<event_base*>(event_init()));
Mark Slee2f6404d2006-10-10 01:37:40 +0000815
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000816 // Run the preServe event
817 if (eventHandler_ != NULL) {
818 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000819 }
820
Mark Sleee02385b2007-06-09 01:21:16 +0000821 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000822 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000823}
824
T Jake Lucianib5e62212009-01-31 22:36:20 +0000825}}} // apache::thrift::server