blob: 8f68d744425ba08dfd9c2a2952cd1e9b4f33d5c2 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000022#include <transport/TSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000023
Mark Sleee02385b2007-06-09 01:21:16 +000024#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000025#include <sys/socket.h>
26#include <netinet/in.h>
27#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000028#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <fcntl.h>
30#include <errno.h>
31#include <assert.h>
32
David Reiss9b903442009-10-21 05:51:28 +000033#ifndef AF_LOCAL
34#define AF_LOCAL AF_UNIX
35#endif
36
T Jake Lucianib5e62212009-01-31 22:36:20 +000037namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000038
T Jake Lucianib5e62212009-01-31 22:36:20 +000039using namespace apache::thrift::protocol;
40using namespace apache::thrift::transport;
41using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000042using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000043using apache::thrift::transport::TSocket;
44using apache::thrift::transport::TTransportException;
Mark Sleee02385b2007-06-09 01:21:16 +000045
46class TConnection::Task: public Runnable {
47 public:
48 Task(boost::shared_ptr<TProcessor> processor,
49 boost::shared_ptr<TProtocol> input,
50 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +000051 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +000052 processor_(processor),
53 input_(input),
54 output_(output),
David Reiss01fe1532010-03-09 05:19:25 +000055 connection_(connection) {}
Mark Sleee02385b2007-06-09 01:21:16 +000056
57 void run() {
58 try {
59 while (processor_->process(input_, output_)) {
60 if (!input_->getTransport()->peek()) {
61 break;
62 }
63 }
64 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000065 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000066 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000067 cerr << "TNonblockingServer exception: " << x.what() << endl;
David Reiss28e88ec2010-03-09 05:19:27 +000068 } catch (bad_alloc&) {
69 cerr << "TNonblockingServer caught bad_alloc exception.";
70 exit(-1);
Mark Sleee02385b2007-06-09 01:21:16 +000071 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000072 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000073 }
Mark Slee79b16942007-11-26 19:05:29 +000074
David Reiss01fe1532010-03-09 05:19:25 +000075 // Signal completion back to the libevent thread via a pipe
76 if (!connection_->notifyServer()) {
77 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +000078 }
David Reiss01fe1532010-03-09 05:19:25 +000079 }
80
81 TConnection* getTConnection() {
82 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +000083 }
84
85 private:
86 boost::shared_ptr<TProcessor> processor_;
87 boost::shared_ptr<TProtocol> input_;
88 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +000089 TConnection* connection_;
Mark Sleee02385b2007-06-09 01:21:16 +000090};
Mark Slee5ea15f92007-03-05 22:55:59 +000091
92void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000093 socket_ = socket;
94 server_ = s;
95 appState_ = APP_INIT;
96 eventFlags_ = 0;
97
98 readBufferPos_ = 0;
99 readWant_ = 0;
100
101 writeBuffer_ = NULL;
102 writeBufferSize_ = 0;
103 writeBufferPos_ = 0;
104
105 socketState_ = SOCKET_RECV;
106 appState_ = APP_INIT;
Mark Slee79b16942007-11-26 19:05:29 +0000107
Mark Slee2f6404d2006-10-10 01:37:40 +0000108 // Set flags, which also registers the event
109 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000110
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000111 // get input/transports
112 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
113 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000114
115 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000116 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
117 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000118}
119
120void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000121 int flags=0, got=0, left=0, sent=0;
122 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000123
124 switch (socketState_) {
125 case SOCKET_RECV:
126 // It is an error to be in this state if we already have all the data
127 assert(readBufferPos_ < readWant_);
128
Mark Slee2f6404d2006-10-10 01:37:40 +0000129 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000130 if (readWant_ > readBufferSize_) {
131 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000132 readBufferSize_ *= 2;
133 }
David Reissd7a16f42008-02-19 22:47:29 +0000134 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000135 if (readBuffer_ == NULL) {
boz6ded7752007-06-05 22:41:18 +0000136 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000137 close();
138 return;
139 }
140 }
141
142 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000143 fetch = readWant_ - readBufferPos_;
144 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee79b16942007-11-26 19:05:29 +0000145
Mark Slee2f6404d2006-10-10 01:37:40 +0000146 if (got > 0) {
147 // Move along in the buffer
148 readBufferPos_ += got;
149
150 // Check that we did not overdo it
151 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000152
Mark Slee2f6404d2006-10-10 01:37:40 +0000153 // We are done reading, move onto the next state
154 if (readBufferPos_ == readWant_) {
155 transition();
156 }
157 return;
158 } else if (got == -1) {
159 // Blocking errors are okay, just move on
160 if (errno == EAGAIN || errno == EWOULDBLOCK) {
161 return;
162 }
163
164 if (errno != ECONNRESET) {
David Reiss01e55c12008-07-13 22:18:51 +0000165 GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000166 }
167 }
168
169 // Whenever we get down here it means a remote disconnect
170 close();
Mark Slee79b16942007-11-26 19:05:29 +0000171
Mark Slee2f6404d2006-10-10 01:37:40 +0000172 return;
173
174 case SOCKET_SEND:
175 // Should never have position past size
176 assert(writeBufferPos_ <= writeBufferSize_);
177
178 // If there is no data to send, then let us move on
179 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000180 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000181 transition();
182 return;
183 }
184
185 flags = 0;
186 #ifdef MSG_NOSIGNAL
187 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
188 // check for the EPIPE return condition and close the socket in that case
189 flags |= MSG_NOSIGNAL;
190 #endif // ifdef MSG_NOSIGNAL
191
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000192 left = writeBufferSize_ - writeBufferPos_;
193 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000194
195 if (sent <= 0) {
196 // Blocking errors are okay, just move on
197 if (errno == EAGAIN || errno == EWOULDBLOCK) {
198 return;
199 }
200 if (errno != EPIPE) {
David Reiss01e55c12008-07-13 22:18:51 +0000201 GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000202 }
203 close();
204 return;
205 }
206
207 writeBufferPos_ += sent;
208
209 // Did we overdo it?
210 assert(writeBufferPos_ <= writeBufferSize_);
211
Mark Slee79b16942007-11-26 19:05:29 +0000212 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000213 if (writeBufferPos_ == writeBufferSize_) {
214 transition();
215 }
216
217 return;
218
219 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000220 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000221 assert(0);
222 }
223}
224
225/**
226 * This is called when the application transitions from one state into
227 * another. This means that it has finished writing the data that it needed
228 * to, or finished receiving the data that it needed to.
229 */
230void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000231
232 int sz = 0;
233
Mark Slee2f6404d2006-10-10 01:37:40 +0000234 // Switch upon the state that we are currently in and move to a new state
235 switch (appState_) {
236
237 case APP_READ_REQUEST:
238 // We are done reading the request, package the read buffer into transport
239 // and get back some data from the dispatch function
Kevin Clark5ace1782009-03-04 21:10:58 +0000240 // If we've used these transport buffers enough times, reset them to avoid bloating
241
Mark Slee2f6404d2006-10-10 01:37:40 +0000242 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
Kevin Clark5ace1782009-03-04 21:10:58 +0000243 ++numReadsSinceReset_;
244 if (numWritesSinceReset_ < 512) {
245 outputTransport_->resetBuffer();
246 } else {
247 // reset the capacity of the output transport if we used it enough times that it might be bloated
248 try {
249 outputTransport_->resetBuffer(true);
250 numWritesSinceReset_ = 0;
251 } catch (TTransportException &ttx) {
252 GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
253 close();
254 return;
255 }
256 }
257
David Reiss52cb7a72008-06-30 21:40:35 +0000258 // Prepend four bytes of blank space to the buffer so we can
259 // write the frame size there later.
260 outputTransport_->getWritePtr(4);
261 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000262
David Reiss01fe1532010-03-09 05:19:25 +0000263 server_->incrementActiveProcessors();
264
Mark Sleee02385b2007-06-09 01:21:16 +0000265 if (server_->isThreadPoolProcessing()) {
266 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000267
David Reiss01fe1532010-03-09 05:19:25 +0000268 // Create task and dispatch to the thread manager
269 boost::shared_ptr<Runnable> task =
270 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
271 inputProtocol_,
272 outputProtocol_,
273 this));
274 // The application is now waiting on the task to finish
275 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000276
David Reisse11f3072008-10-07 21:39:19 +0000277 try {
278 server_->addTask(task);
279 } catch (IllegalStateException & ise) {
280 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000281 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000282 close();
283 }
Mark Slee402ee282007-08-23 01:43:20 +0000284
David Reiss01fe1532010-03-09 05:19:25 +0000285 // Set this connection idle so that libevent doesn't process more
286 // data on it while we're still waiting for the threadmanager to
287 // finish this task
288 setIdle();
289 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000290 } else {
291 try {
292 // Invoke the processor
293 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
294 } catch (TTransportException &ttx) {
David Reiss01e55c12008-07-13 22:18:51 +0000295 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000296 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000297 close();
298 return;
299 } catch (TException &x) {
David Reiss01e55c12008-07-13 22:18:51 +0000300 GlobalOutput.printf("TException: Server::process() %s", x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000301 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000302 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000303 return;
304 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000305 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000306 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000307 close();
308 return;
309 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000310 }
311
Mark Slee402ee282007-08-23 01:43:20 +0000312 // Intentionally fall through here, the call to process has written into
313 // the writeBuffer_
314
Mark Sleee02385b2007-06-09 01:21:16 +0000315 case APP_WAIT_TASK:
316 // We have now finished processing a task and the result has been written
317 // into the outputTransport_, so we grab its contents and place them into
318 // the writeBuffer_ for actual writing by the libevent thread
319
David Reiss01fe1532010-03-09 05:19:25 +0000320 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000321 // Get the result of the operation
322 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
323
324 // If the function call generated return data, then move into the send
325 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000326 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000327 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000328
329 // Move into write state
330 writeBufferPos_ = 0;
331 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000332
David Reissaf787782008-07-03 20:29:34 +0000333 // Put the frame size into the write buffer
334 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
335 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000336
337 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000338 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000339 setWrite();
340
341 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000342 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000343
344 return;
345 }
346
David Reissc51986f2009-03-24 20:01:25 +0000347 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000348 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000349 goto LABEL_APP_INIT;
350
Mark Slee2f6404d2006-10-10 01:37:40 +0000351 case APP_SEND_RESULT:
352
Kevin Clark5ace1782009-03-04 21:10:58 +0000353 ++numWritesSinceReset_;
354
Mark Slee2f6404d2006-10-10 01:37:40 +0000355 // N.B.: We also intentionally fall through here into the INIT state!
356
Mark Slee92f00fb2006-10-25 01:28:17 +0000357 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000358 case APP_INIT:
359
Kevin Clark5ace1782009-03-04 21:10:58 +0000360 // reset the input buffer if we used it enough times that it might be bloated
361 if (numReadsSinceReset_ > 512)
362 {
363 void * new_buffer = std::realloc(readBuffer_, 1024);
364 if (new_buffer == NULL) {
365 GlobalOutput("TConnection::transition() realloc");
366 close();
367 return;
368 }
369 readBuffer_ = (uint8_t*) new_buffer;
370 readBufferSize_ = 1024;
371 numReadsSinceReset_ = 0;
372 }
373
Mark Slee2f6404d2006-10-10 01:37:40 +0000374 // Clear write buffer variables
375 writeBuffer_ = NULL;
376 writeBufferPos_ = 0;
377 writeBufferSize_ = 0;
378
379 // Set up read buffer for getting 4 bytes
380 readBufferPos_ = 0;
381 readWant_ = 4;
382
383 // Into read4 state we go
384 socketState_ = SOCKET_RECV;
385 appState_ = APP_READ_FRAME_SIZE;
386
387 // Register read event
388 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000389
Mark Slee2f6404d2006-10-10 01:37:40 +0000390 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000391 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000392
393 return;
394
395 case APP_READ_FRAME_SIZE:
396 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000397 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000398 sz = (int32_t)ntohl(sz);
399
400 if (sz <= 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000401 GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
Mark Slee2f6404d2006-10-10 01:37:40 +0000402 close();
403 return;
404 }
405
406 // Reset the read buffer
407 readWant_ = (uint32_t)sz;
408 readBufferPos_= 0;
409
410 // Move into read request state
411 appState_ = APP_READ_REQUEST;
412
413 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000414 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000415
416 return;
417
David Reiss01fe1532010-03-09 05:19:25 +0000418 case APP_CLOSE_CONNECTION:
419 server_->decrementActiveProcessors();
420 close();
421 return;
422
Mark Slee2f6404d2006-10-10 01:37:40 +0000423 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000424 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000425 assert(0);
426 }
427}
428
429void TConnection::setFlags(short eventFlags) {
430 // Catch the do nothing case
431 if (eventFlags_ == eventFlags) {
432 return;
433 }
434
435 // Delete a previously existing event
436 if (eventFlags_ != 0) {
437 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000438 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000439 return;
440 }
441 }
442
443 // Update in memory structure
444 eventFlags_ = eventFlags;
445
Mark Slee402ee282007-08-23 01:43:20 +0000446 // Do not call event_set if there are no flags
447 if (!eventFlags_) {
448 return;
449 }
450
David Reiss01fe1532010-03-09 05:19:25 +0000451 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000452 * event_set:
453 *
454 * Prepares the event structure &event to be used in future calls to
455 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000456 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000457 *
458 * The events can be either EV_READ, EV_WRITE, or both, indicating
459 * that an application can read or write from the file respectively without
460 * blocking.
461 *
Mark Sleee02385b2007-06-09 01:21:16 +0000462 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000463 * the event and the type of event which will be one of: EV_TIMEOUT,
464 * EV_SIGNAL, EV_READ, EV_WRITE.
465 *
466 * The additional flag EV_PERSIST makes an event_add() persistent until
467 * event_del() has been called.
468 *
469 * Once initialized, the &event struct can be used repeatedly with
470 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000471 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000472 * when an ev structure has been added to libevent using event_add() the
473 * structure must persist until the event occurs (assuming EV_PERSIST
474 * is not set) or is removed using event_del(). You may not reuse the same
475 * ev structure for multiple monitored descriptors; each descriptor needs
476 * its own ev.
477 */
478 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000479 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000480
481 // Add the event
482 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000483 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000484 }
485}
486
487/**
488 * Closes a connection
489 */
490void TConnection::close() {
491 // Delete the registered libevent
492 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000493 GlobalOutput("TConnection::close() event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000494 }
495
496 // Close the socket
David Reiss01fe1532010-03-09 05:19:25 +0000497 if (socket_ >= 0) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000498 ::close(socket_);
499 }
David Reiss01fe1532010-03-09 05:19:25 +0000500 socket_ = -1;
Mark Slee2f6404d2006-10-10 01:37:40 +0000501
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000502 // close any factory produced transports
503 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000504 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000505
Mark Slee2f6404d2006-10-10 01:37:40 +0000506 // Give this object back to the server that owns it
507 server_->returnConnection(this);
508}
509
David Reiss01fe1532010-03-09 05:19:25 +0000510void TConnection::checkIdleBufferMemLimit(size_t limit) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000511 if (readBufferSize_ > limit) {
512 readBufferSize_ = limit;
513 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
514 if (readBuffer_ == NULL) {
515 GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
516 close();
517 }
518 }
519}
520
Mark Slee2f6404d2006-10-10 01:37:40 +0000521/**
522 * Creates a new connection either by reusing an object off the stack or
523 * by allocating a new one entirely
524 */
525TConnection* TNonblockingServer::createConnection(int socket, short flags) {
526 // Check the stack
527 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000528 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000529 } else {
530 TConnection* result = connectionStack_.top();
531 connectionStack_.pop();
532 result->init(socket, flags, this);
533 return result;
534 }
535}
536
537/**
538 * Returns a connection to the stack
539 */
540void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000541 if (connectionStackLimit_ &&
542 (connectionStack_.size() >= connectionStackLimit_)) {
543 delete connection;
544 } else {
545 connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
546 connectionStack_.push(connection);
547 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000548}
549
550/**
David Reissa79e4882008-03-05 07:51:47 +0000551 * Server socket had something happen. We accept all waiting client
552 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000553 */
554void TNonblockingServer::handleEvent(int fd, short which) {
David Reiss3bb5e052010-01-25 19:31:31 +0000555 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000556 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000557
Mark Slee2f6404d2006-10-10 01:37:40 +0000558 // Server socket accepted a new connection
559 socklen_t addrLen;
560 struct sockaddr addr;
Mark Slee79b16942007-11-26 19:05:29 +0000561 addrLen = sizeof(addr);
562
Mark Slee2f6404d2006-10-10 01:37:40 +0000563 // Going to accept a new client socket
564 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000565
Mark Slee2f6404d2006-10-10 01:37:40 +0000566 // Accept as many new clients as possible, even though libevent signaled only
567 // one, this helps us to avoid having to go back into the libevent engine so
568 // many times
569 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000570 // If we're overloaded, take action here
571 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
572 nConnectionsDropped_++;
573 nTotalConnectionsDropped_++;
574 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
575 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000576 return;
David Reiss01fe1532010-03-09 05:19:25 +0000577 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
578 if (!drainPendingTask()) {
579 // Nothing left to discard, so we drop connection instead.
580 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000581 return;
David Reiss01fe1532010-03-09 05:19:25 +0000582 }
583 }
584 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000585 // Explicitly set this socket to NONBLOCK mode
586 int flags;
587 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
588 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000589 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000590 close(clientSocket);
591 return;
592 }
593
594 // Create a new TConnection for this client socket.
595 TConnection* clientConnection =
596 createConnection(clientSocket, EV_READ | EV_PERSIST);
597
598 // Fail fast if we could not create a TConnection object
599 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000600 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000601 close(clientSocket);
602 return;
603 }
604
605 // Put this client connection into the proper state
606 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000607
608 // addrLen is written by the accept() call, so needs to be set before the next call.
609 addrLen = sizeof(addr);
Mark Slee2f6404d2006-10-10 01:37:40 +0000610 }
Mark Slee79b16942007-11-26 19:05:29 +0000611
Mark Slee2f6404d2006-10-10 01:37:40 +0000612 // Done looping accept, now we have to make sure the error is due to
613 // blocking. Any other error is a problem
614 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000615 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000616 }
617}
618
619/**
Mark Slee79b16942007-11-26 19:05:29 +0000620 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000621 */
Mark Slee79b16942007-11-26 19:05:29 +0000622void TNonblockingServer::listenSocket() {
623 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000624 struct addrinfo hints, *res, *res0;
625 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000626
Mark Sleefb4b5142007-11-20 01:27:08 +0000627 char port[sizeof("65536") + 1];
628 memset(&hints, 0, sizeof(hints));
629 hints.ai_family = PF_UNSPEC;
630 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000631 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000632 sprintf(port, "%d", port_);
633
634 // Wildcard address
635 error = getaddrinfo(NULL, port, &hints, &res0);
636 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000637 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
638 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000639 return;
640 }
641
642 // Pick the ipv6 address first since ipv4 addresses can be mapped
643 // into ipv6 space.
644 for (res = res0; res; res = res->ai_next) {
645 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
646 break;
647 }
648
Mark Slee2f6404d2006-10-10 01:37:40 +0000649 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000650 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
651 if (s == -1) {
652 freeaddrinfo(res0);
653 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000654 }
655
David Reiss13aea462008-06-10 22:56:04 +0000656 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +0000657 if (res->ai_family == AF_INET6) {
658 int zero = 0;
659 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
660 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
661 }
David Reiss13aea462008-06-10 22:56:04 +0000662 }
663 #endif // #ifdef IPV6_V6ONLY
664
665
Mark Slee79b16942007-11-26 19:05:29 +0000666 int one = 1;
667
668 // Set reuseaddr to avoid 2MSL delay on server restart
669 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
670
671 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
672 close(s);
673 freeaddrinfo(res0);
674 throw TException("TNonblockingServer::serve() bind");
675 }
676
677 // Done with the addr info
678 freeaddrinfo(res0);
679
680 // Set up this file descriptor for listening
681 listenSocket(s);
682}
683
684/**
685 * Takes a socket created by listenSocket() and sets various options on it
686 * to prepare for use in the server.
687 */
688void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000689 // Set socket to nonblocking mode
690 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000691 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
692 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
693 close(s);
694 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000695 }
696
697 int one = 1;
698 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000699
700 // Keepalive to ensure full result flushing
Mark Slee79b16942007-11-26 19:05:29 +0000701 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000702
703 // Turn linger off to avoid hung sockets
Mark Slee79b16942007-11-26 19:05:29 +0000704 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000705
706 // Set TCP nodelay if available, MAC OS X Hack
707 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
708 #ifndef TCP_NOPUSH
Mark Slee79b16942007-11-26 19:05:29 +0000709 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000710 #endif
711
David Reiss1c20c872010-03-09 05:20:14 +0000712 #ifdef TCP_LOW_MIN_RTO
713 if (TSocket::getUseLowMinRto()) {
714 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
715 }
716 #endif
717
Mark Slee79b16942007-11-26 19:05:29 +0000718 if (listen(s, LISTEN_BACKLOG) == -1) {
719 close(s);
720 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000721 }
722
Mark Slee79b16942007-11-26 19:05:29 +0000723 // Cool, this socket is good to go, set it as the serverSocket_
724 serverSocket_ = s;
725}
726
David Reiss01fe1532010-03-09 05:19:25 +0000727void TNonblockingServer::createNotificationPipe() {
728 if (pipe(notificationPipeFDs_) != 0) {
729 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
730 throw TException("can't create notification pipe");
731 }
David Reiss83b8fda2010-03-09 05:19:34 +0000732 int flags;
733 if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
734 fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
735 close(notificationPipeFDs_[0]);
736 close(notificationPipeFDs_[1]);
737 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
738 }
David Reiss01fe1532010-03-09 05:19:25 +0000739}
740
Mark Slee79b16942007-11-26 19:05:29 +0000741/**
742 * Register the core libevent events onto the proper base.
743 */
744void TNonblockingServer::registerEvents(event_base* base) {
745 assert(serverSocket_ != -1);
746 assert(!eventBase_);
747 eventBase_ = base;
748
749 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000750 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000751 event_get_version(),
752 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000753
754 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000755 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000756 serverSocket_,
757 EV_READ | EV_PERSIST,
758 TNonblockingServer::eventHandler,
759 this);
Mark Slee79b16942007-11-26 19:05:29 +0000760 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000761
762 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000763 if (-1 == event_add(&serverEvent_, 0)) {
764 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000765 }
David Reiss01fe1532010-03-09 05:19:25 +0000766 if (threadPoolProcessing_) {
767 // Create an event to be notified when a task finishes
768 event_set(&notificationEvent_,
769 getNotificationRecvFD(),
770 EV_READ | EV_PERSIST,
771 TConnection::taskHandler,
772 this);
David Reiss1c20c872010-03-09 05:20:14 +0000773
David Reiss01fe1532010-03-09 05:19:25 +0000774 // Attach to the base
775 event_base_set(eventBase_, &notificationEvent_);
776
777 // Add the event and start up the server
778 if (-1 == event_add(&notificationEvent_, 0)) {
779 throw TException("TNonblockingServer::serve(): notification event_add fail");
780 }
781 }
782}
783
David Reiss068f4162010-03-09 05:19:45 +0000784void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
785 threadManager_ = threadManager;
786 if (threadManager != NULL) {
787 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
788 threadPoolProcessing_ = true;
789 } else {
790 threadPoolProcessing_ = false;
791 }
792}
793
David Reiss01fe1532010-03-09 05:19:25 +0000794bool TNonblockingServer::serverOverloaded() {
795 size_t activeConnections = numTConnections_ - connectionStack_.size();
796 if (numActiveProcessors_ > maxActiveProcessors_ ||
797 activeConnections > maxConnections_) {
798 if (!overloaded_) {
799 GlobalOutput.printf("thrift non-blocking server overload condition");
800 overloaded_ = true;
801 }
802 } else {
803 if (overloaded_ &&
804 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
805 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
806 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
807 nConnectionsDropped_, nTotalConnectionsDropped_);
808 nConnectionsDropped_ = 0;
809 overloaded_ = false;
810 }
811 }
812
813 return overloaded_;
814}
815
816bool TNonblockingServer::drainPendingTask() {
817 if (threadManager_) {
818 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
819 if (task) {
820 TConnection* connection =
821 static_cast<TConnection::Task*>(task.get())->getTConnection();
822 assert(connection && connection->getServer()
823 && connection->getState() == APP_WAIT_TASK);
824 connection->forceClose();
825 return true;
826 }
827 }
828 return false;
Mark Slee79b16942007-11-26 19:05:29 +0000829}
830
David Reiss068f4162010-03-09 05:19:45 +0000831void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
832 TConnection* connection =
833 static_cast<TConnection::Task*>(task.get())->getTConnection();
834 assert(connection && connection->getServer()
835 && connection->getState() == APP_WAIT_TASK);
836 connection->forceClose();
837}
838
Mark Slee79b16942007-11-26 19:05:29 +0000839/**
840 * Main workhorse function, starts up the server listening on a port and
841 * loops over the libevent handler.
842 */
843void TNonblockingServer::serve() {
844 // Init socket
845 listenSocket();
846
David Reiss01fe1532010-03-09 05:19:25 +0000847 if (threadPoolProcessing_) {
848 // Init task completion notification pipe
849 createNotificationPipe();
850 }
851
Mark Slee79b16942007-11-26 19:05:29 +0000852 // Initialize libevent core
853 registerEvents(static_cast<event_base*>(event_init()));
Mark Slee2f6404d2006-10-10 01:37:40 +0000854
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000855 // Run the preServe event
856 if (eventHandler_ != NULL) {
857 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000858 }
859
Mark Sleee02385b2007-06-09 01:21:16 +0000860 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000861 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000862}
863
T Jake Lucianib5e62212009-01-31 22:36:20 +0000864}}} // apache::thrift::server