blob: 26e97c967d04027066bf0ad139aa6810df3be53f [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000022
Mark Sleee02385b2007-06-09 01:21:16 +000023#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000024#include <sys/socket.h>
25#include <netinet/in.h>
26#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000027#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000028#include <fcntl.h>
29#include <errno.h>
30#include <assert.h>
31
David Reiss9b903442009-10-21 05:51:28 +000032#ifndef AF_LOCAL
33#define AF_LOCAL AF_UNIX
34#endif
35
T Jake Lucianib5e62212009-01-31 22:36:20 +000036namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000037
T Jake Lucianib5e62212009-01-31 22:36:20 +000038using namespace apache::thrift::protocol;
39using namespace apache::thrift::transport;
40using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000041using namespace std;
42
43class TConnection::Task: public Runnable {
44 public:
45 Task(boost::shared_ptr<TProcessor> processor,
46 boost::shared_ptr<TProtocol> input,
47 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +000048 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +000049 processor_(processor),
50 input_(input),
51 output_(output),
David Reiss01fe1532010-03-09 05:19:25 +000052 connection_(connection) {}
Mark Sleee02385b2007-06-09 01:21:16 +000053
54 void run() {
55 try {
56 while (processor_->process(input_, output_)) {
57 if (!input_->getTransport()->peek()) {
58 break;
59 }
60 }
61 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000062 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000063 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000064 cerr << "TNonblockingServer exception: " << x.what() << endl;
David Reiss28e88ec2010-03-09 05:19:27 +000065 } catch (bad_alloc&) {
66 cerr << "TNonblockingServer caught bad_alloc exception.";
67 exit(-1);
Mark Sleee02385b2007-06-09 01:21:16 +000068 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000069 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000070 }
Mark Slee79b16942007-11-26 19:05:29 +000071
David Reiss01fe1532010-03-09 05:19:25 +000072 // Signal completion back to the libevent thread via a pipe
73 if (!connection_->notifyServer()) {
74 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +000075 }
David Reiss01fe1532010-03-09 05:19:25 +000076 }
77
78 TConnection* getTConnection() {
79 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +000080 }
81
82 private:
83 boost::shared_ptr<TProcessor> processor_;
84 boost::shared_ptr<TProtocol> input_;
85 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +000086 TConnection* connection_;
Mark Sleee02385b2007-06-09 01:21:16 +000087};
Mark Slee5ea15f92007-03-05 22:55:59 +000088
89void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000090 socket_ = socket;
91 server_ = s;
92 appState_ = APP_INIT;
93 eventFlags_ = 0;
94
95 readBufferPos_ = 0;
96 readWant_ = 0;
97
98 writeBuffer_ = NULL;
99 writeBufferSize_ = 0;
100 writeBufferPos_ = 0;
101
102 socketState_ = SOCKET_RECV;
103 appState_ = APP_INIT;
Mark Slee79b16942007-11-26 19:05:29 +0000104
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 // Set flags, which also registers the event
106 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000107
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000108 // get input/transports
109 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
110 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000111
112 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000113 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
114 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000115}
116
117void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000118 int flags=0, got=0, left=0, sent=0;
119 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000120
121 switch (socketState_) {
122 case SOCKET_RECV:
123 // It is an error to be in this state if we already have all the data
124 assert(readBufferPos_ < readWant_);
125
Mark Slee2f6404d2006-10-10 01:37:40 +0000126 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000127 if (readWant_ > readBufferSize_) {
128 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000129 readBufferSize_ *= 2;
130 }
David Reissd7a16f42008-02-19 22:47:29 +0000131 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000132 if (readBuffer_ == NULL) {
boz6ded7752007-06-05 22:41:18 +0000133 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000134 close();
135 return;
136 }
137 }
138
139 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000140 fetch = readWant_ - readBufferPos_;
141 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee79b16942007-11-26 19:05:29 +0000142
Mark Slee2f6404d2006-10-10 01:37:40 +0000143 if (got > 0) {
144 // Move along in the buffer
145 readBufferPos_ += got;
146
147 // Check that we did not overdo it
148 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000149
Mark Slee2f6404d2006-10-10 01:37:40 +0000150 // We are done reading, move onto the next state
151 if (readBufferPos_ == readWant_) {
152 transition();
153 }
154 return;
155 } else if (got == -1) {
156 // Blocking errors are okay, just move on
157 if (errno == EAGAIN || errno == EWOULDBLOCK) {
158 return;
159 }
160
161 if (errno != ECONNRESET) {
David Reiss01e55c12008-07-13 22:18:51 +0000162 GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000163 }
164 }
165
166 // Whenever we get down here it means a remote disconnect
167 close();
Mark Slee79b16942007-11-26 19:05:29 +0000168
Mark Slee2f6404d2006-10-10 01:37:40 +0000169 return;
170
171 case SOCKET_SEND:
172 // Should never have position past size
173 assert(writeBufferPos_ <= writeBufferSize_);
174
175 // If there is no data to send, then let us move on
176 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000177 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000178 transition();
179 return;
180 }
181
182 flags = 0;
183 #ifdef MSG_NOSIGNAL
184 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
185 // check for the EPIPE return condition and close the socket in that case
186 flags |= MSG_NOSIGNAL;
187 #endif // ifdef MSG_NOSIGNAL
188
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000189 left = writeBufferSize_ - writeBufferPos_;
190 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000191
192 if (sent <= 0) {
193 // Blocking errors are okay, just move on
194 if (errno == EAGAIN || errno == EWOULDBLOCK) {
195 return;
196 }
197 if (errno != EPIPE) {
David Reiss01e55c12008-07-13 22:18:51 +0000198 GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000199 }
200 close();
201 return;
202 }
203
204 writeBufferPos_ += sent;
205
206 // Did we overdo it?
207 assert(writeBufferPos_ <= writeBufferSize_);
208
Mark Slee79b16942007-11-26 19:05:29 +0000209 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000210 if (writeBufferPos_ == writeBufferSize_) {
211 transition();
212 }
213
214 return;
215
216 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000217 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000218 assert(0);
219 }
220}
221
222/**
223 * This is called when the application transitions from one state into
224 * another. This means that it has finished writing the data that it needed
225 * to, or finished receiving the data that it needed to.
226 */
227void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000228
229 int sz = 0;
230
Mark Slee2f6404d2006-10-10 01:37:40 +0000231 // Switch upon the state that we are currently in and move to a new state
232 switch (appState_) {
233
234 case APP_READ_REQUEST:
235 // We are done reading the request, package the read buffer into transport
236 // and get back some data from the dispatch function
Kevin Clark5ace1782009-03-04 21:10:58 +0000237 // If we've used these transport buffers enough times, reset them to avoid bloating
238
Mark Slee2f6404d2006-10-10 01:37:40 +0000239 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
Kevin Clark5ace1782009-03-04 21:10:58 +0000240 ++numReadsSinceReset_;
241 if (numWritesSinceReset_ < 512) {
242 outputTransport_->resetBuffer();
243 } else {
244 // reset the capacity of the output transport if we used it enough times that it might be bloated
245 try {
246 outputTransport_->resetBuffer(true);
247 numWritesSinceReset_ = 0;
248 } catch (TTransportException &ttx) {
249 GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
250 close();
251 return;
252 }
253 }
254
David Reiss52cb7a72008-06-30 21:40:35 +0000255 // Prepend four bytes of blank space to the buffer so we can
256 // write the frame size there later.
257 outputTransport_->getWritePtr(4);
258 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000259
David Reiss01fe1532010-03-09 05:19:25 +0000260 server_->incrementActiveProcessors();
261
Mark Sleee02385b2007-06-09 01:21:16 +0000262 if (server_->isThreadPoolProcessing()) {
263 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000264
David Reiss01fe1532010-03-09 05:19:25 +0000265 // Create task and dispatch to the thread manager
266 boost::shared_ptr<Runnable> task =
267 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
268 inputProtocol_,
269 outputProtocol_,
270 this));
271 // The application is now waiting on the task to finish
272 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000273
David Reisse11f3072008-10-07 21:39:19 +0000274 try {
275 server_->addTask(task);
276 } catch (IllegalStateException & ise) {
277 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000278 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000279 close();
280 }
Mark Slee402ee282007-08-23 01:43:20 +0000281
David Reiss01fe1532010-03-09 05:19:25 +0000282 // Set this connection idle so that libevent doesn't process more
283 // data on it while we're still waiting for the threadmanager to
284 // finish this task
285 setIdle();
286 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000287 } else {
288 try {
289 // Invoke the processor
290 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
291 } catch (TTransportException &ttx) {
David Reiss01e55c12008-07-13 22:18:51 +0000292 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000293 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000294 close();
295 return;
296 } catch (TException &x) {
David Reiss01e55c12008-07-13 22:18:51 +0000297 GlobalOutput.printf("TException: Server::process() %s", x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000298 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000299 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000300 return;
301 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000302 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000303 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000304 close();
305 return;
306 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000307 }
308
Mark Slee402ee282007-08-23 01:43:20 +0000309 // Intentionally fall through here, the call to process has written into
310 // the writeBuffer_
311
Mark Sleee02385b2007-06-09 01:21:16 +0000312 case APP_WAIT_TASK:
313 // We have now finished processing a task and the result has been written
314 // into the outputTransport_, so we grab its contents and place them into
315 // the writeBuffer_ for actual writing by the libevent thread
316
David Reiss01fe1532010-03-09 05:19:25 +0000317 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000318 // Get the result of the operation
319 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
320
321 // If the function call generated return data, then move into the send
322 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000323 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000324 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000325
326 // Move into write state
327 writeBufferPos_ = 0;
328 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000329
David Reissaf787782008-07-03 20:29:34 +0000330 // Put the frame size into the write buffer
331 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
332 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000333
334 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000335 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000336 setWrite();
337
338 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000339 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000340
341 return;
342 }
343
David Reissc51986f2009-03-24 20:01:25 +0000344 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000345 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000346 goto LABEL_APP_INIT;
347
Mark Slee2f6404d2006-10-10 01:37:40 +0000348 case APP_SEND_RESULT:
349
Kevin Clark5ace1782009-03-04 21:10:58 +0000350 ++numWritesSinceReset_;
351
Mark Slee2f6404d2006-10-10 01:37:40 +0000352 // N.B.: We also intentionally fall through here into the INIT state!
353
Mark Slee92f00fb2006-10-25 01:28:17 +0000354 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000355 case APP_INIT:
356
Kevin Clark5ace1782009-03-04 21:10:58 +0000357 // reset the input buffer if we used it enough times that it might be bloated
358 if (numReadsSinceReset_ > 512)
359 {
360 void * new_buffer = std::realloc(readBuffer_, 1024);
361 if (new_buffer == NULL) {
362 GlobalOutput("TConnection::transition() realloc");
363 close();
364 return;
365 }
366 readBuffer_ = (uint8_t*) new_buffer;
367 readBufferSize_ = 1024;
368 numReadsSinceReset_ = 0;
369 }
370
Mark Slee2f6404d2006-10-10 01:37:40 +0000371 // Clear write buffer variables
372 writeBuffer_ = NULL;
373 writeBufferPos_ = 0;
374 writeBufferSize_ = 0;
375
376 // Set up read buffer for getting 4 bytes
377 readBufferPos_ = 0;
378 readWant_ = 4;
379
380 // Into read4 state we go
381 socketState_ = SOCKET_RECV;
382 appState_ = APP_READ_FRAME_SIZE;
383
384 // Register read event
385 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000386
Mark Slee2f6404d2006-10-10 01:37:40 +0000387 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000388 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000389
390 return;
391
392 case APP_READ_FRAME_SIZE:
393 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000394 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000395 sz = (int32_t)ntohl(sz);
396
397 if (sz <= 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000398 GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
Mark Slee2f6404d2006-10-10 01:37:40 +0000399 close();
400 return;
401 }
402
403 // Reset the read buffer
404 readWant_ = (uint32_t)sz;
405 readBufferPos_= 0;
406
407 // Move into read request state
408 appState_ = APP_READ_REQUEST;
409
410 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000411 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000412
413 return;
414
David Reiss01fe1532010-03-09 05:19:25 +0000415 case APP_CLOSE_CONNECTION:
416 server_->decrementActiveProcessors();
417 close();
418 return;
419
Mark Slee2f6404d2006-10-10 01:37:40 +0000420 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000421 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000422 assert(0);
423 }
424}
425
426void TConnection::setFlags(short eventFlags) {
427 // Catch the do nothing case
428 if (eventFlags_ == eventFlags) {
429 return;
430 }
431
432 // Delete a previously existing event
433 if (eventFlags_ != 0) {
434 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000435 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000436 return;
437 }
438 }
439
440 // Update in memory structure
441 eventFlags_ = eventFlags;
442
Mark Slee402ee282007-08-23 01:43:20 +0000443 // Do not call event_set if there are no flags
444 if (!eventFlags_) {
445 return;
446 }
447
David Reiss01fe1532010-03-09 05:19:25 +0000448 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000449 * event_set:
450 *
451 * Prepares the event structure &event to be used in future calls to
452 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000453 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000454 *
455 * The events can be either EV_READ, EV_WRITE, or both, indicating
456 * that an application can read or write from the file respectively without
457 * blocking.
458 *
Mark Sleee02385b2007-06-09 01:21:16 +0000459 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000460 * the event and the type of event which will be one of: EV_TIMEOUT,
461 * EV_SIGNAL, EV_READ, EV_WRITE.
462 *
463 * The additional flag EV_PERSIST makes an event_add() persistent until
464 * event_del() has been called.
465 *
466 * Once initialized, the &event struct can be used repeatedly with
467 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000468 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000469 * when an ev structure has been added to libevent using event_add() the
470 * structure must persist until the event occurs (assuming EV_PERSIST
471 * is not set) or is removed using event_del(). You may not reuse the same
472 * ev structure for multiple monitored descriptors; each descriptor needs
473 * its own ev.
474 */
475 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000476 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000477
478 // Add the event
479 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000480 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000481 }
482}
483
484/**
485 * Closes a connection
486 */
487void TConnection::close() {
488 // Delete the registered libevent
489 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000490 GlobalOutput("TConnection::close() event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000491 }
492
493 // Close the socket
David Reiss01fe1532010-03-09 05:19:25 +0000494 if (socket_ >= 0) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000495 ::close(socket_);
496 }
David Reiss01fe1532010-03-09 05:19:25 +0000497 socket_ = -1;
Mark Slee2f6404d2006-10-10 01:37:40 +0000498
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000499 // close any factory produced transports
500 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000501 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000502
Mark Slee2f6404d2006-10-10 01:37:40 +0000503 // Give this object back to the server that owns it
504 server_->returnConnection(this);
505}
506
David Reiss01fe1532010-03-09 05:19:25 +0000507void TConnection::checkIdleBufferMemLimit(size_t limit) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000508 if (readBufferSize_ > limit) {
509 readBufferSize_ = limit;
510 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
511 if (readBuffer_ == NULL) {
512 GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
513 close();
514 }
515 }
516}
517
Mark Slee2f6404d2006-10-10 01:37:40 +0000518/**
519 * Creates a new connection either by reusing an object off the stack or
520 * by allocating a new one entirely
521 */
522TConnection* TNonblockingServer::createConnection(int socket, short flags) {
523 // Check the stack
524 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000525 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000526 } else {
527 TConnection* result = connectionStack_.top();
528 connectionStack_.pop();
529 result->init(socket, flags, this);
530 return result;
531 }
532}
533
534/**
535 * Returns a connection to the stack
536 */
537void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000538 if (connectionStackLimit_ &&
539 (connectionStack_.size() >= connectionStackLimit_)) {
540 delete connection;
541 } else {
542 connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
543 connectionStack_.push(connection);
544 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000545}
546
547/**
David Reissa79e4882008-03-05 07:51:47 +0000548 * Server socket had something happen. We accept all waiting client
549 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000550 */
551void TNonblockingServer::handleEvent(int fd, short which) {
David Reiss3bb5e052010-01-25 19:31:31 +0000552 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000553 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000554
Mark Slee2f6404d2006-10-10 01:37:40 +0000555 // Server socket accepted a new connection
556 socklen_t addrLen;
557 struct sockaddr addr;
Mark Slee79b16942007-11-26 19:05:29 +0000558 addrLen = sizeof(addr);
559
Mark Slee2f6404d2006-10-10 01:37:40 +0000560 // Going to accept a new client socket
561 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000562
Mark Slee2f6404d2006-10-10 01:37:40 +0000563 // Accept as many new clients as possible, even though libevent signaled only
564 // one, this helps us to avoid having to go back into the libevent engine so
565 // many times
566 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000567 // If we're overloaded, take action here
568 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
569 nConnectionsDropped_++;
570 nTotalConnectionsDropped_++;
571 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
572 close(clientSocket);
573 continue;
574 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
575 if (!drainPendingTask()) {
576 // Nothing left to discard, so we drop connection instead.
577 close(clientSocket);
578 continue;
579 }
580 }
581 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000582 // Explicitly set this socket to NONBLOCK mode
583 int flags;
584 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
585 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000586 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000587 close(clientSocket);
588 return;
589 }
590
591 // Create a new TConnection for this client socket.
592 TConnection* clientConnection =
593 createConnection(clientSocket, EV_READ | EV_PERSIST);
594
595 // Fail fast if we could not create a TConnection object
596 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000597 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000598 close(clientSocket);
599 return;
600 }
601
602 // Put this client connection into the proper state
603 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000604
605 // addrLen is written by the accept() call, so needs to be set before the next call.
606 addrLen = sizeof(addr);
Mark Slee2f6404d2006-10-10 01:37:40 +0000607 }
Mark Slee79b16942007-11-26 19:05:29 +0000608
Mark Slee2f6404d2006-10-10 01:37:40 +0000609 // Done looping accept, now we have to make sure the error is due to
610 // blocking. Any other error is a problem
611 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000612 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000613 }
614}
615
616/**
Mark Slee79b16942007-11-26 19:05:29 +0000617 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000618 */
Mark Slee79b16942007-11-26 19:05:29 +0000619void TNonblockingServer::listenSocket() {
620 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000621 struct addrinfo hints, *res, *res0;
622 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000623
Mark Sleefb4b5142007-11-20 01:27:08 +0000624 char port[sizeof("65536") + 1];
625 memset(&hints, 0, sizeof(hints));
626 hints.ai_family = PF_UNSPEC;
627 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000628 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000629 sprintf(port, "%d", port_);
630
631 // Wildcard address
632 error = getaddrinfo(NULL, port, &hints, &res0);
633 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000634 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
635 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000636 return;
637 }
638
639 // Pick the ipv6 address first since ipv4 addresses can be mapped
640 // into ipv6 space.
641 for (res = res0; res; res = res->ai_next) {
642 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
643 break;
644 }
645
Mark Slee2f6404d2006-10-10 01:37:40 +0000646 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000647 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
648 if (s == -1) {
649 freeaddrinfo(res0);
650 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000651 }
652
David Reiss13aea462008-06-10 22:56:04 +0000653 #ifdef IPV6_V6ONLY
654 int zero = 0;
655 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
656 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
657 }
658 #endif // #ifdef IPV6_V6ONLY
659
660
Mark Slee79b16942007-11-26 19:05:29 +0000661 int one = 1;
662
663 // Set reuseaddr to avoid 2MSL delay on server restart
664 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
665
666 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
667 close(s);
668 freeaddrinfo(res0);
669 throw TException("TNonblockingServer::serve() bind");
670 }
671
672 // Done with the addr info
673 freeaddrinfo(res0);
674
675 // Set up this file descriptor for listening
676 listenSocket(s);
677}
678
679/**
680 * Takes a socket created by listenSocket() and sets various options on it
681 * to prepare for use in the server.
682 */
683void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000684 // Set socket to nonblocking mode
685 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000686 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
687 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
688 close(s);
689 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000690 }
691
692 int one = 1;
693 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000694
695 // Keepalive to ensure full result flushing
Mark Slee79b16942007-11-26 19:05:29 +0000696 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000697
698 // Turn linger off to avoid hung sockets
Mark Slee79b16942007-11-26 19:05:29 +0000699 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000700
701 // Set TCP nodelay if available, MAC OS X Hack
702 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
703 #ifndef TCP_NOPUSH
Mark Slee79b16942007-11-26 19:05:29 +0000704 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000705 #endif
706
Mark Slee79b16942007-11-26 19:05:29 +0000707 if (listen(s, LISTEN_BACKLOG) == -1) {
708 close(s);
709 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000710 }
711
Mark Slee79b16942007-11-26 19:05:29 +0000712 // Cool, this socket is good to go, set it as the serverSocket_
713 serverSocket_ = s;
714}
715
David Reiss01fe1532010-03-09 05:19:25 +0000716void TNonblockingServer::createNotificationPipe() {
717 if (pipe(notificationPipeFDs_) != 0) {
718 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
719 throw TException("can't create notification pipe");
720 }
721}
722
Mark Slee79b16942007-11-26 19:05:29 +0000723/**
724 * Register the core libevent events onto the proper base.
725 */
726void TNonblockingServer::registerEvents(event_base* base) {
727 assert(serverSocket_ != -1);
728 assert(!eventBase_);
729 eventBase_ = base;
730
731 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000732 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000733 event_get_version(),
734 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000735
736 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000737 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000738 serverSocket_,
739 EV_READ | EV_PERSIST,
740 TNonblockingServer::eventHandler,
741 this);
Mark Slee79b16942007-11-26 19:05:29 +0000742 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000743
744 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000745 if (-1 == event_add(&serverEvent_, 0)) {
746 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000747 }
David Reiss01fe1532010-03-09 05:19:25 +0000748 if (threadPoolProcessing_) {
749 // Create an event to be notified when a task finishes
750 event_set(&notificationEvent_,
751 getNotificationRecvFD(),
752 EV_READ | EV_PERSIST,
753 TConnection::taskHandler,
754 this);
755
756 // Attach to the base
757 event_base_set(eventBase_, &notificationEvent_);
758
759 // Add the event and start up the server
760 if (-1 == event_add(&notificationEvent_, 0)) {
761 throw TException("TNonblockingServer::serve(): notification event_add fail");
762 }
763 }
764}
765
766bool TNonblockingServer::serverOverloaded() {
767 size_t activeConnections = numTConnections_ - connectionStack_.size();
768 if (numActiveProcessors_ > maxActiveProcessors_ ||
769 activeConnections > maxConnections_) {
770 if (!overloaded_) {
771 GlobalOutput.printf("thrift non-blocking server overload condition");
772 overloaded_ = true;
773 }
774 } else {
775 if (overloaded_ &&
776 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
777 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
778 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
779 nConnectionsDropped_, nTotalConnectionsDropped_);
780 nConnectionsDropped_ = 0;
781 overloaded_ = false;
782 }
783 }
784
785 return overloaded_;
786}
787
788bool TNonblockingServer::drainPendingTask() {
789 if (threadManager_) {
790 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
791 if (task) {
792 TConnection* connection =
793 static_cast<TConnection::Task*>(task.get())->getTConnection();
794 assert(connection && connection->getServer()
795 && connection->getState() == APP_WAIT_TASK);
796 connection->forceClose();
797 return true;
798 }
799 }
800 return false;
Mark Slee79b16942007-11-26 19:05:29 +0000801}
802
803/**
804 * Main workhorse function, starts up the server listening on a port and
805 * loops over the libevent handler.
806 */
807void TNonblockingServer::serve() {
808 // Init socket
809 listenSocket();
810
David Reiss01fe1532010-03-09 05:19:25 +0000811 if (threadPoolProcessing_) {
812 // Init task completion notification pipe
813 createNotificationPipe();
814 }
815
Mark Slee79b16942007-11-26 19:05:29 +0000816 // Initialize libevent core
817 registerEvents(static_cast<event_base*>(event_init()));
Mark Slee2f6404d2006-10-10 01:37:40 +0000818
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000819 // Run the preServe event
820 if (eventHandler_ != NULL) {
821 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000822 }
823
Mark Sleee02385b2007-06-09 01:21:16 +0000824 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000825 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000826}
827
T Jake Lucianib5e62212009-01-31 22:36:20 +0000828}}} // apache::thrift::server