blob: 85fe265755bcbcd873fb6ac751d4c392d0ac1a50 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000022#include <transport/TSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000023
Mark Sleee02385b2007-06-09 01:21:16 +000024#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000025#include <sys/socket.h>
26#include <netinet/in.h>
27#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000028#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <fcntl.h>
30#include <errno.h>
31#include <assert.h>
32
David Reiss9b903442009-10-21 05:51:28 +000033#ifndef AF_LOCAL
34#define AF_LOCAL AF_UNIX
35#endif
36
T Jake Lucianib5e62212009-01-31 22:36:20 +000037namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000038
T Jake Lucianib5e62212009-01-31 22:36:20 +000039using namespace apache::thrift::protocol;
40using namespace apache::thrift::transport;
41using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000042using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000043using apache::thrift::transport::TSocket;
44using apache::thrift::transport::TTransportException;
Mark Sleee02385b2007-06-09 01:21:16 +000045
46class TConnection::Task: public Runnable {
47 public:
48 Task(boost::shared_ptr<TProcessor> processor,
49 boost::shared_ptr<TProtocol> input,
50 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +000051 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +000052 processor_(processor),
53 input_(input),
54 output_(output),
David Reiss01fe1532010-03-09 05:19:25 +000055 connection_(connection) {}
Mark Sleee02385b2007-06-09 01:21:16 +000056
57 void run() {
58 try {
59 while (processor_->process(input_, output_)) {
60 if (!input_->getTransport()->peek()) {
61 break;
62 }
63 }
64 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000065 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000066 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000067 cerr << "TNonblockingServer exception: " << x.what() << endl;
David Reiss28e88ec2010-03-09 05:19:27 +000068 } catch (bad_alloc&) {
69 cerr << "TNonblockingServer caught bad_alloc exception.";
70 exit(-1);
Mark Sleee02385b2007-06-09 01:21:16 +000071 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000072 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000073 }
Mark Slee79b16942007-11-26 19:05:29 +000074
David Reiss01fe1532010-03-09 05:19:25 +000075 // Signal completion back to the libevent thread via a pipe
76 if (!connection_->notifyServer()) {
77 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +000078 }
David Reiss01fe1532010-03-09 05:19:25 +000079 }
80
81 TConnection* getTConnection() {
82 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +000083 }
84
85 private:
86 boost::shared_ptr<TProcessor> processor_;
87 boost::shared_ptr<TProtocol> input_;
88 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +000089 TConnection* connection_;
Mark Sleee02385b2007-06-09 01:21:16 +000090};
Mark Slee5ea15f92007-03-05 22:55:59 +000091
92void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000093 socket_ = socket;
94 server_ = s;
95 appState_ = APP_INIT;
96 eventFlags_ = 0;
97
98 readBufferPos_ = 0;
99 readWant_ = 0;
100
101 writeBuffer_ = NULL;
102 writeBufferSize_ = 0;
103 writeBufferPos_ = 0;
104
105 socketState_ = SOCKET_RECV;
106 appState_ = APP_INIT;
Mark Slee79b16942007-11-26 19:05:29 +0000107
Mark Slee2f6404d2006-10-10 01:37:40 +0000108 // Set flags, which also registers the event
109 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000110
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000111 // get input/transports
112 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
113 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000114
115 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000116 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
117 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000118}
119
120void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000121 int flags=0, got=0, left=0, sent=0;
122 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000123
124 switch (socketState_) {
125 case SOCKET_RECV:
126 // It is an error to be in this state if we already have all the data
127 assert(readBufferPos_ < readWant_);
128
Mark Slee2f6404d2006-10-10 01:37:40 +0000129 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000130 if (readWant_ > readBufferSize_) {
David Reiss472fffb2010-03-09 05:20:24 +0000131 uint32_t newSize = readBufferSize_;
132 while (readWant_ > newSize) {
133 newSize *= 2;
Mark Slee2f6404d2006-10-10 01:37:40 +0000134 }
David Reiss472fffb2010-03-09 05:20:24 +0000135 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
136 if (newBuffer == NULL) {
boz6ded7752007-06-05 22:41:18 +0000137 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000138 close();
139 return;
140 }
David Reiss472fffb2010-03-09 05:20:24 +0000141 readBuffer_ = newBuffer;
142 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000143 }
144
145 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000146 fetch = readWant_ - readBufferPos_;
147 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee79b16942007-11-26 19:05:29 +0000148
Mark Slee2f6404d2006-10-10 01:37:40 +0000149 if (got > 0) {
150 // Move along in the buffer
151 readBufferPos_ += got;
152
153 // Check that we did not overdo it
154 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000155
Mark Slee2f6404d2006-10-10 01:37:40 +0000156 // We are done reading, move onto the next state
157 if (readBufferPos_ == readWant_) {
158 transition();
159 }
160 return;
161 } else if (got == -1) {
162 // Blocking errors are okay, just move on
163 if (errno == EAGAIN || errno == EWOULDBLOCK) {
164 return;
165 }
166
167 if (errno != ECONNRESET) {
David Reiss01e55c12008-07-13 22:18:51 +0000168 GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000169 }
170 }
171
172 // Whenever we get down here it means a remote disconnect
173 close();
Mark Slee79b16942007-11-26 19:05:29 +0000174
Mark Slee2f6404d2006-10-10 01:37:40 +0000175 return;
176
177 case SOCKET_SEND:
178 // Should never have position past size
179 assert(writeBufferPos_ <= writeBufferSize_);
180
181 // If there is no data to send, then let us move on
182 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000183 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000184 transition();
185 return;
186 }
187
188 flags = 0;
189 #ifdef MSG_NOSIGNAL
190 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
191 // check for the EPIPE return condition and close the socket in that case
192 flags |= MSG_NOSIGNAL;
193 #endif // ifdef MSG_NOSIGNAL
194
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000195 left = writeBufferSize_ - writeBufferPos_;
196 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000197
198 if (sent <= 0) {
199 // Blocking errors are okay, just move on
200 if (errno == EAGAIN || errno == EWOULDBLOCK) {
201 return;
202 }
203 if (errno != EPIPE) {
David Reiss01e55c12008-07-13 22:18:51 +0000204 GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000205 }
206 close();
207 return;
208 }
209
210 writeBufferPos_ += sent;
211
212 // Did we overdo it?
213 assert(writeBufferPos_ <= writeBufferSize_);
214
Mark Slee79b16942007-11-26 19:05:29 +0000215 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000216 if (writeBufferPos_ == writeBufferSize_) {
217 transition();
218 }
219
220 return;
221
222 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000223 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000224 assert(0);
225 }
226}
227
228/**
229 * This is called when the application transitions from one state into
230 * another. This means that it has finished writing the data that it needed
231 * to, or finished receiving the data that it needed to.
232 */
233void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000234
235 int sz = 0;
236
Mark Slee2f6404d2006-10-10 01:37:40 +0000237 // Switch upon the state that we are currently in and move to a new state
238 switch (appState_) {
239
240 case APP_READ_REQUEST:
241 // We are done reading the request, package the read buffer into transport
242 // and get back some data from the dispatch function
Kevin Clark5ace1782009-03-04 21:10:58 +0000243 // If we've used these transport buffers enough times, reset them to avoid bloating
244
Mark Slee2f6404d2006-10-10 01:37:40 +0000245 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
Kevin Clark5ace1782009-03-04 21:10:58 +0000246 ++numReadsSinceReset_;
247 if (numWritesSinceReset_ < 512) {
248 outputTransport_->resetBuffer();
249 } else {
250 // reset the capacity of the output transport if we used it enough times that it might be bloated
251 try {
252 outputTransport_->resetBuffer(true);
253 numWritesSinceReset_ = 0;
254 } catch (TTransportException &ttx) {
255 GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
256 close();
257 return;
258 }
259 }
260
David Reiss52cb7a72008-06-30 21:40:35 +0000261 // Prepend four bytes of blank space to the buffer so we can
262 // write the frame size there later.
263 outputTransport_->getWritePtr(4);
264 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000265
David Reiss01fe1532010-03-09 05:19:25 +0000266 server_->incrementActiveProcessors();
267
Mark Sleee02385b2007-06-09 01:21:16 +0000268 if (server_->isThreadPoolProcessing()) {
269 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000270
David Reiss01fe1532010-03-09 05:19:25 +0000271 // Create task and dispatch to the thread manager
272 boost::shared_ptr<Runnable> task =
273 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
274 inputProtocol_,
275 outputProtocol_,
276 this));
277 // The application is now waiting on the task to finish
278 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000279
David Reisse11f3072008-10-07 21:39:19 +0000280 try {
281 server_->addTask(task);
282 } catch (IllegalStateException & ise) {
283 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000284 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000285 close();
286 }
Mark Slee402ee282007-08-23 01:43:20 +0000287
David Reiss01fe1532010-03-09 05:19:25 +0000288 // Set this connection idle so that libevent doesn't process more
289 // data on it while we're still waiting for the threadmanager to
290 // finish this task
291 setIdle();
292 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000293 } else {
294 try {
295 // Invoke the processor
296 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
297 } catch (TTransportException &ttx) {
David Reiss01e55c12008-07-13 22:18:51 +0000298 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000299 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000300 close();
301 return;
302 } catch (TException &x) {
David Reiss01e55c12008-07-13 22:18:51 +0000303 GlobalOutput.printf("TException: Server::process() %s", x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000304 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000305 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000306 return;
307 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000308 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000309 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000310 close();
311 return;
312 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000313 }
314
Mark Slee402ee282007-08-23 01:43:20 +0000315 // Intentionally fall through here, the call to process has written into
316 // the writeBuffer_
317
Mark Sleee02385b2007-06-09 01:21:16 +0000318 case APP_WAIT_TASK:
319 // We have now finished processing a task and the result has been written
320 // into the outputTransport_, so we grab its contents and place them into
321 // the writeBuffer_ for actual writing by the libevent thread
322
David Reiss01fe1532010-03-09 05:19:25 +0000323 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000324 // Get the result of the operation
325 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
326
327 // If the function call generated return data, then move into the send
328 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000329 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000330 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000331
332 // Move into write state
333 writeBufferPos_ = 0;
334 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000335
David Reissaf787782008-07-03 20:29:34 +0000336 // Put the frame size into the write buffer
337 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
338 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000339
340 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000341 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000342 setWrite();
343
344 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000345 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000346
347 return;
348 }
349
David Reissc51986f2009-03-24 20:01:25 +0000350 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000351 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000352 goto LABEL_APP_INIT;
353
Mark Slee2f6404d2006-10-10 01:37:40 +0000354 case APP_SEND_RESULT:
355
Kevin Clark5ace1782009-03-04 21:10:58 +0000356 ++numWritesSinceReset_;
357
Mark Slee2f6404d2006-10-10 01:37:40 +0000358 // N.B.: We also intentionally fall through here into the INIT state!
359
Mark Slee92f00fb2006-10-25 01:28:17 +0000360 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000361 case APP_INIT:
362
Kevin Clark5ace1782009-03-04 21:10:58 +0000363 // reset the input buffer if we used it enough times that it might be bloated
364 if (numReadsSinceReset_ > 512)
365 {
366 void * new_buffer = std::realloc(readBuffer_, 1024);
367 if (new_buffer == NULL) {
368 GlobalOutput("TConnection::transition() realloc");
369 close();
370 return;
371 }
372 readBuffer_ = (uint8_t*) new_buffer;
373 readBufferSize_ = 1024;
374 numReadsSinceReset_ = 0;
375 }
376
Mark Slee2f6404d2006-10-10 01:37:40 +0000377 // Clear write buffer variables
378 writeBuffer_ = NULL;
379 writeBufferPos_ = 0;
380 writeBufferSize_ = 0;
381
382 // Set up read buffer for getting 4 bytes
383 readBufferPos_ = 0;
384 readWant_ = 4;
385
386 // Into read4 state we go
387 socketState_ = SOCKET_RECV;
388 appState_ = APP_READ_FRAME_SIZE;
389
390 // Register read event
391 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000392
Mark Slee2f6404d2006-10-10 01:37:40 +0000393 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000394 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000395
396 return;
397
398 case APP_READ_FRAME_SIZE:
399 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000400 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000401 sz = (int32_t)ntohl(sz);
402
403 if (sz <= 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000404 GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
Mark Slee2f6404d2006-10-10 01:37:40 +0000405 close();
406 return;
407 }
408
409 // Reset the read buffer
410 readWant_ = (uint32_t)sz;
411 readBufferPos_= 0;
412
413 // Move into read request state
414 appState_ = APP_READ_REQUEST;
415
416 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000417 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000418
419 return;
420
David Reiss01fe1532010-03-09 05:19:25 +0000421 case APP_CLOSE_CONNECTION:
422 server_->decrementActiveProcessors();
423 close();
424 return;
425
Mark Slee2f6404d2006-10-10 01:37:40 +0000426 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000427 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000428 assert(0);
429 }
430}
431
432void TConnection::setFlags(short eventFlags) {
433 // Catch the do nothing case
434 if (eventFlags_ == eventFlags) {
435 return;
436 }
437
438 // Delete a previously existing event
439 if (eventFlags_ != 0) {
440 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000441 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000442 return;
443 }
444 }
445
446 // Update in memory structure
447 eventFlags_ = eventFlags;
448
Mark Slee402ee282007-08-23 01:43:20 +0000449 // Do not call event_set if there are no flags
450 if (!eventFlags_) {
451 return;
452 }
453
David Reiss01fe1532010-03-09 05:19:25 +0000454 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000455 * event_set:
456 *
457 * Prepares the event structure &event to be used in future calls to
458 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000459 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000460 *
461 * The events can be either EV_READ, EV_WRITE, or both, indicating
462 * that an application can read or write from the file respectively without
463 * blocking.
464 *
Mark Sleee02385b2007-06-09 01:21:16 +0000465 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000466 * the event and the type of event which will be one of: EV_TIMEOUT,
467 * EV_SIGNAL, EV_READ, EV_WRITE.
468 *
469 * The additional flag EV_PERSIST makes an event_add() persistent until
470 * event_del() has been called.
471 *
472 * Once initialized, the &event struct can be used repeatedly with
473 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000474 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000475 * when an ev structure has been added to libevent using event_add() the
476 * structure must persist until the event occurs (assuming EV_PERSIST
477 * is not set) or is removed using event_del(). You may not reuse the same
478 * ev structure for multiple monitored descriptors; each descriptor needs
479 * its own ev.
480 */
481 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000482 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000483
484 // Add the event
485 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000486 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000487 }
488}
489
490/**
491 * Closes a connection
492 */
493void TConnection::close() {
494 // Delete the registered libevent
495 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000496 GlobalOutput("TConnection::close() event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000497 }
498
499 // Close the socket
David Reiss01fe1532010-03-09 05:19:25 +0000500 if (socket_ >= 0) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000501 ::close(socket_);
502 }
David Reiss01fe1532010-03-09 05:19:25 +0000503 socket_ = -1;
Mark Slee2f6404d2006-10-10 01:37:40 +0000504
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000505 // close any factory produced transports
506 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000507 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000508
Mark Slee2f6404d2006-10-10 01:37:40 +0000509 // Give this object back to the server that owns it
510 server_->returnConnection(this);
511}
512
David Reiss01fe1532010-03-09 05:19:25 +0000513void TConnection::checkIdleBufferMemLimit(size_t limit) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000514 if (readBufferSize_ > limit) {
515 readBufferSize_ = limit;
516 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
517 if (readBuffer_ == NULL) {
518 GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
519 close();
520 }
521 }
522}
523
David Reiss8ede8182010-09-02 15:26:28 +0000524TNonblockingServer::~TNonblockingServer() {
525 // TODO: We currently leak any active TConnection objects.
526 // Since we're shutting down and destroying the event_base, the TConnection
527 // objects will never receive any additional callbacks. (And even if they
528 // did, it would be bad, since they keep a pointer around to the server,
529 // which is being destroyed.)
530
531 // Clean up unused TConnection objects in connectionStack_
532 while (!connectionStack_.empty()) {
533 TConnection* connection = connectionStack_.top();
534 connectionStack_.pop();
535 delete connection;
536 }
537
538 if (eventBase_) {
539 event_base_free(eventBase_);
540 }
541
542 if (serverSocket_ >= 0) {
543 close(serverSocket_);
544 }
545}
546
Mark Slee2f6404d2006-10-10 01:37:40 +0000547/**
548 * Creates a new connection either by reusing an object off the stack or
549 * by allocating a new one entirely
550 */
551TConnection* TNonblockingServer::createConnection(int socket, short flags) {
552 // Check the stack
553 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000554 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000555 } else {
556 TConnection* result = connectionStack_.top();
557 connectionStack_.pop();
558 result->init(socket, flags, this);
559 return result;
560 }
561}
562
563/**
564 * Returns a connection to the stack
565 */
566void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000567 if (connectionStackLimit_ &&
568 (connectionStack_.size() >= connectionStackLimit_)) {
569 delete connection;
570 } else {
571 connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
572 connectionStack_.push(connection);
573 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000574}
575
576/**
David Reissa79e4882008-03-05 07:51:47 +0000577 * Server socket had something happen. We accept all waiting client
578 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000579 */
580void TNonblockingServer::handleEvent(int fd, short which) {
David Reiss3bb5e052010-01-25 19:31:31 +0000581 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000582 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000583
Mark Slee2f6404d2006-10-10 01:37:40 +0000584 // Server socket accepted a new connection
585 socklen_t addrLen;
586 struct sockaddr addr;
Mark Slee79b16942007-11-26 19:05:29 +0000587 addrLen = sizeof(addr);
588
Mark Slee2f6404d2006-10-10 01:37:40 +0000589 // Going to accept a new client socket
590 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000591
Mark Slee2f6404d2006-10-10 01:37:40 +0000592 // Accept as many new clients as possible, even though libevent signaled only
593 // one, this helps us to avoid having to go back into the libevent engine so
594 // many times
595 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000596 // If we're overloaded, take action here
597 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
598 nConnectionsDropped_++;
599 nTotalConnectionsDropped_++;
600 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
601 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000602 return;
David Reiss01fe1532010-03-09 05:19:25 +0000603 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
604 if (!drainPendingTask()) {
605 // Nothing left to discard, so we drop connection instead.
606 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000607 return;
David Reiss01fe1532010-03-09 05:19:25 +0000608 }
609 }
610 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000611 // Explicitly set this socket to NONBLOCK mode
612 int flags;
613 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
614 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000615 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000616 close(clientSocket);
617 return;
618 }
619
620 // Create a new TConnection for this client socket.
621 TConnection* clientConnection =
622 createConnection(clientSocket, EV_READ | EV_PERSIST);
623
624 // Fail fast if we could not create a TConnection object
625 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000626 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000627 close(clientSocket);
628 return;
629 }
630
631 // Put this client connection into the proper state
632 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000633
634 // addrLen is written by the accept() call, so needs to be set before the next call.
635 addrLen = sizeof(addr);
Mark Slee2f6404d2006-10-10 01:37:40 +0000636 }
Mark Slee79b16942007-11-26 19:05:29 +0000637
Mark Slee2f6404d2006-10-10 01:37:40 +0000638 // Done looping accept, now we have to make sure the error is due to
639 // blocking. Any other error is a problem
640 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000641 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000642 }
643}
644
645/**
Mark Slee79b16942007-11-26 19:05:29 +0000646 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000647 */
Mark Slee79b16942007-11-26 19:05:29 +0000648void TNonblockingServer::listenSocket() {
649 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000650 struct addrinfo hints, *res, *res0;
651 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000652
Mark Sleefb4b5142007-11-20 01:27:08 +0000653 char port[sizeof("65536") + 1];
654 memset(&hints, 0, sizeof(hints));
655 hints.ai_family = PF_UNSPEC;
656 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000657 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000658 sprintf(port, "%d", port_);
659
660 // Wildcard address
661 error = getaddrinfo(NULL, port, &hints, &res0);
662 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000663 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
664 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000665 return;
666 }
667
668 // Pick the ipv6 address first since ipv4 addresses can be mapped
669 // into ipv6 space.
670 for (res = res0; res; res = res->ai_next) {
671 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
672 break;
673 }
674
Mark Slee2f6404d2006-10-10 01:37:40 +0000675 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000676 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
677 if (s == -1) {
678 freeaddrinfo(res0);
679 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000680 }
681
David Reiss13aea462008-06-10 22:56:04 +0000682 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +0000683 if (res->ai_family == AF_INET6) {
684 int zero = 0;
685 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
686 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
687 }
David Reiss13aea462008-06-10 22:56:04 +0000688 }
689 #endif // #ifdef IPV6_V6ONLY
690
691
Mark Slee79b16942007-11-26 19:05:29 +0000692 int one = 1;
693
694 // Set reuseaddr to avoid 2MSL delay on server restart
695 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
696
697 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
698 close(s);
699 freeaddrinfo(res0);
700 throw TException("TNonblockingServer::serve() bind");
701 }
702
703 // Done with the addr info
704 freeaddrinfo(res0);
705
706 // Set up this file descriptor for listening
707 listenSocket(s);
708}
709
710/**
711 * Takes a socket created by listenSocket() and sets various options on it
712 * to prepare for use in the server.
713 */
714void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000715 // Set socket to nonblocking mode
716 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000717 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
718 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
719 close(s);
720 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000721 }
722
723 int one = 1;
724 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000725
726 // Keepalive to ensure full result flushing
Mark Slee79b16942007-11-26 19:05:29 +0000727 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000728
729 // Turn linger off to avoid hung sockets
Mark Slee79b16942007-11-26 19:05:29 +0000730 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000731
732 // Set TCP nodelay if available, MAC OS X Hack
733 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
734 #ifndef TCP_NOPUSH
Mark Slee79b16942007-11-26 19:05:29 +0000735 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000736 #endif
737
David Reiss1c20c872010-03-09 05:20:14 +0000738 #ifdef TCP_LOW_MIN_RTO
739 if (TSocket::getUseLowMinRto()) {
740 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
741 }
742 #endif
743
Mark Slee79b16942007-11-26 19:05:29 +0000744 if (listen(s, LISTEN_BACKLOG) == -1) {
745 close(s);
746 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000747 }
748
Mark Slee79b16942007-11-26 19:05:29 +0000749 // Cool, this socket is good to go, set it as the serverSocket_
750 serverSocket_ = s;
751}
752
David Reiss01fe1532010-03-09 05:19:25 +0000753void TNonblockingServer::createNotificationPipe() {
754 if (pipe(notificationPipeFDs_) != 0) {
755 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
756 throw TException("can't create notification pipe");
757 }
David Reiss83b8fda2010-03-09 05:19:34 +0000758 int flags;
759 if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
760 fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
761 close(notificationPipeFDs_[0]);
762 close(notificationPipeFDs_[1]);
763 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
764 }
David Reiss01fe1532010-03-09 05:19:25 +0000765}
766
Mark Slee79b16942007-11-26 19:05:29 +0000767/**
768 * Register the core libevent events onto the proper base.
769 */
770void TNonblockingServer::registerEvents(event_base* base) {
771 assert(serverSocket_ != -1);
772 assert(!eventBase_);
773 eventBase_ = base;
774
775 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000776 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000777 event_get_version(),
778 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000779
780 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000781 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000782 serverSocket_,
783 EV_READ | EV_PERSIST,
784 TNonblockingServer::eventHandler,
785 this);
Mark Slee79b16942007-11-26 19:05:29 +0000786 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000787
788 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000789 if (-1 == event_add(&serverEvent_, 0)) {
790 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000791 }
David Reiss01fe1532010-03-09 05:19:25 +0000792 if (threadPoolProcessing_) {
793 // Create an event to be notified when a task finishes
794 event_set(&notificationEvent_,
795 getNotificationRecvFD(),
796 EV_READ | EV_PERSIST,
797 TConnection::taskHandler,
798 this);
David Reiss1c20c872010-03-09 05:20:14 +0000799
David Reiss01fe1532010-03-09 05:19:25 +0000800 // Attach to the base
801 event_base_set(eventBase_, &notificationEvent_);
802
803 // Add the event and start up the server
804 if (-1 == event_add(&notificationEvent_, 0)) {
805 throw TException("TNonblockingServer::serve(): notification event_add fail");
806 }
807 }
808}
809
David Reiss068f4162010-03-09 05:19:45 +0000810void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
811 threadManager_ = threadManager;
812 if (threadManager != NULL) {
813 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
814 threadPoolProcessing_ = true;
815 } else {
816 threadPoolProcessing_ = false;
817 }
818}
819
David Reiss01fe1532010-03-09 05:19:25 +0000820bool TNonblockingServer::serverOverloaded() {
821 size_t activeConnections = numTConnections_ - connectionStack_.size();
822 if (numActiveProcessors_ > maxActiveProcessors_ ||
823 activeConnections > maxConnections_) {
824 if (!overloaded_) {
825 GlobalOutput.printf("thrift non-blocking server overload condition");
826 overloaded_ = true;
827 }
828 } else {
829 if (overloaded_ &&
830 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
831 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
832 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
833 nConnectionsDropped_, nTotalConnectionsDropped_);
834 nConnectionsDropped_ = 0;
835 overloaded_ = false;
836 }
837 }
838
839 return overloaded_;
840}
841
842bool TNonblockingServer::drainPendingTask() {
843 if (threadManager_) {
844 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
845 if (task) {
846 TConnection* connection =
847 static_cast<TConnection::Task*>(task.get())->getTConnection();
848 assert(connection && connection->getServer()
849 && connection->getState() == APP_WAIT_TASK);
850 connection->forceClose();
851 return true;
852 }
853 }
854 return false;
Mark Slee79b16942007-11-26 19:05:29 +0000855}
856
David Reiss068f4162010-03-09 05:19:45 +0000857void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
858 TConnection* connection =
859 static_cast<TConnection::Task*>(task.get())->getTConnection();
860 assert(connection && connection->getServer()
861 && connection->getState() == APP_WAIT_TASK);
862 connection->forceClose();
863}
864
Mark Slee79b16942007-11-26 19:05:29 +0000865/**
866 * Main workhorse function, starts up the server listening on a port and
867 * loops over the libevent handler.
868 */
869void TNonblockingServer::serve() {
870 // Init socket
871 listenSocket();
872
David Reiss01fe1532010-03-09 05:19:25 +0000873 if (threadPoolProcessing_) {
874 // Init task completion notification pipe
875 createNotificationPipe();
876 }
877
Mark Slee79b16942007-11-26 19:05:29 +0000878 // Initialize libevent core
879 registerEvents(static_cast<event_base*>(event_init()));
Mark Slee2f6404d2006-10-10 01:37:40 +0000880
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000881 // Run the preServe event
882 if (eventHandler_ != NULL) {
883 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000884 }
885
Mark Sleee02385b2007-06-09 01:21:16 +0000886 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000887 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000888}
889
T Jake Lucianib5e62212009-01-31 22:36:20 +0000890}}} // apache::thrift::server