blob: 32f021a2214cbb6ccf148539957341fd2477f621 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000022
Mark Sleee02385b2007-06-09 01:21:16 +000023#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000024#include <sys/socket.h>
25#include <netinet/in.h>
26#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000027#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000028#include <fcntl.h>
29#include <errno.h>
30#include <assert.h>
31
T Jake Lucianib5e62212009-01-31 22:36:20 +000032namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000033
T Jake Lucianib5e62212009-01-31 22:36:20 +000034using namespace apache::thrift::protocol;
35using namespace apache::thrift::transport;
36using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000037using namespace std;
38
39class TConnection::Task: public Runnable {
40 public:
41 Task(boost::shared_ptr<TProcessor> processor,
42 boost::shared_ptr<TProtocol> input,
43 boost::shared_ptr<TProtocol> output,
44 int taskHandle) :
45 processor_(processor),
46 input_(input),
47 output_(output),
48 taskHandle_(taskHandle) {}
49
50 void run() {
51 try {
52 while (processor_->process(input_, output_)) {
53 if (!input_->getTransport()->peek()) {
54 break;
55 }
56 }
57 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000058 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000059 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000060 cerr << "TNonblockingServer exception: " << x.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000061 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000062 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000063 }
Mark Slee79b16942007-11-26 19:05:29 +000064
Mark Sleee02385b2007-06-09 01:21:16 +000065 // Signal completion back to the libevent thread via a socketpair
66 int8_t b = 0;
67 if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
David Reiss01e55c12008-07-13 22:18:51 +000068 GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +000069 }
70 if (-1 == ::close(taskHandle_)) {
David Reiss01e55c12008-07-13 22:18:51 +000071 GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +000072 }
73 }
74
75 private:
76 boost::shared_ptr<TProcessor> processor_;
77 boost::shared_ptr<TProtocol> input_;
78 boost::shared_ptr<TProtocol> output_;
79 int taskHandle_;
80};
Mark Slee5ea15f92007-03-05 22:55:59 +000081
82void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000083 socket_ = socket;
84 server_ = s;
85 appState_ = APP_INIT;
86 eventFlags_ = 0;
87
88 readBufferPos_ = 0;
89 readWant_ = 0;
90
91 writeBuffer_ = NULL;
92 writeBufferSize_ = 0;
93 writeBufferPos_ = 0;
94
95 socketState_ = SOCKET_RECV;
96 appState_ = APP_INIT;
Mark Slee79b16942007-11-26 19:05:29 +000097
Mark Sleee02385b2007-06-09 01:21:16 +000098 taskHandle_ = -1;
99
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 // Set flags, which also registers the event
101 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000102
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000103 // get input/transports
104 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
105 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000106
107 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000108 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
109 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000110}
111
112void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000113 int flags=0, got=0, left=0, sent=0;
114 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000115
116 switch (socketState_) {
117 case SOCKET_RECV:
118 // It is an error to be in this state if we already have all the data
119 assert(readBufferPos_ < readWant_);
120
Mark Slee2f6404d2006-10-10 01:37:40 +0000121 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000122 if (readWant_ > readBufferSize_) {
123 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000124 readBufferSize_ *= 2;
125 }
David Reissd7a16f42008-02-19 22:47:29 +0000126 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000127 if (readBuffer_ == NULL) {
boz6ded7752007-06-05 22:41:18 +0000128 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000129 close();
130 return;
131 }
132 }
133
134 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000135 fetch = readWant_ - readBufferPos_;
136 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee79b16942007-11-26 19:05:29 +0000137
Mark Slee2f6404d2006-10-10 01:37:40 +0000138 if (got > 0) {
139 // Move along in the buffer
140 readBufferPos_ += got;
141
142 // Check that we did not overdo it
143 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000144
Mark Slee2f6404d2006-10-10 01:37:40 +0000145 // We are done reading, move onto the next state
146 if (readBufferPos_ == readWant_) {
147 transition();
148 }
149 return;
150 } else if (got == -1) {
151 // Blocking errors are okay, just move on
152 if (errno == EAGAIN || errno == EWOULDBLOCK) {
153 return;
154 }
155
156 if (errno != ECONNRESET) {
David Reiss01e55c12008-07-13 22:18:51 +0000157 GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000158 }
159 }
160
161 // Whenever we get down here it means a remote disconnect
162 close();
Mark Slee79b16942007-11-26 19:05:29 +0000163
Mark Slee2f6404d2006-10-10 01:37:40 +0000164 return;
165
166 case SOCKET_SEND:
167 // Should never have position past size
168 assert(writeBufferPos_ <= writeBufferSize_);
169
170 // If there is no data to send, then let us move on
171 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000172 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000173 transition();
174 return;
175 }
176
177 flags = 0;
178 #ifdef MSG_NOSIGNAL
179 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
180 // check for the EPIPE return condition and close the socket in that case
181 flags |= MSG_NOSIGNAL;
182 #endif // ifdef MSG_NOSIGNAL
183
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000184 left = writeBufferSize_ - writeBufferPos_;
185 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000186
187 if (sent <= 0) {
188 // Blocking errors are okay, just move on
189 if (errno == EAGAIN || errno == EWOULDBLOCK) {
190 return;
191 }
192 if (errno != EPIPE) {
David Reiss01e55c12008-07-13 22:18:51 +0000193 GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000194 }
195 close();
196 return;
197 }
198
199 writeBufferPos_ += sent;
200
201 // Did we overdo it?
202 assert(writeBufferPos_ <= writeBufferSize_);
203
Mark Slee79b16942007-11-26 19:05:29 +0000204 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000205 if (writeBufferPos_ == writeBufferSize_) {
206 transition();
207 }
208
209 return;
210
211 default:
David Reiss01e55c12008-07-13 22:18:51 +0000212 GlobalOutput.printf("Shit Got Ill. Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000213 assert(0);
214 }
215}
216
217/**
218 * This is called when the application transitions from one state into
219 * another. This means that it has finished writing the data that it needed
220 * to, or finished receiving the data that it needed to.
221 */
222void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000223
224 int sz = 0;
225
Mark Slee2f6404d2006-10-10 01:37:40 +0000226 // Switch upon the state that we are currently in and move to a new state
227 switch (appState_) {
228
229 case APP_READ_REQUEST:
230 // We are done reading the request, package the read buffer into transport
231 // and get back some data from the dispatch function
Kevin Clark5ace1782009-03-04 21:10:58 +0000232 // If we've used these transport buffers enough times, reset them to avoid bloating
233
Mark Slee2f6404d2006-10-10 01:37:40 +0000234 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
Kevin Clark5ace1782009-03-04 21:10:58 +0000235 ++numReadsSinceReset_;
236 if (numWritesSinceReset_ < 512) {
237 outputTransport_->resetBuffer();
238 } else {
239 // reset the capacity of the output transport if we used it enough times that it might be bloated
240 try {
241 outputTransport_->resetBuffer(true);
242 numWritesSinceReset_ = 0;
243 } catch (TTransportException &ttx) {
244 GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
245 close();
246 return;
247 }
248 }
249
David Reiss52cb7a72008-06-30 21:40:35 +0000250 // Prepend four bytes of blank space to the buffer so we can
251 // write the frame size there later.
252 outputTransport_->getWritePtr(4);
253 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000254
Mark Sleee02385b2007-06-09 01:21:16 +0000255 if (server_->isThreadPoolProcessing()) {
256 // We are setting up a Task to do this work and we will wait on it
257 int sv[2];
258 if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
David Reiss01e55c12008-07-13 22:18:51 +0000259 GlobalOutput.perror("TConnection::socketpair() failed ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +0000260 // Now we will fall through to the APP_WAIT_TASK block with no response
261 } else {
262 // Create task and dispatch to the thread manager
263 boost::shared_ptr<Runnable> task =
264 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
265 inputProtocol_,
266 outputProtocol_,
267 sv[1]));
Mark Slee79b16942007-11-26 19:05:29 +0000268 // The application is now waiting on the task to finish
Mark Sleee02385b2007-06-09 01:21:16 +0000269 appState_ = APP_WAIT_TASK;
Mark Slee79b16942007-11-26 19:05:29 +0000270
271 // Create an event to be notified when the task finishes
Mark Sleee02385b2007-06-09 01:21:16 +0000272 event_set(&taskEvent_,
273 taskHandle_ = sv[0],
274 EV_READ,
275 TConnection::taskHandler,
276 this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000277
Mark Slee79b16942007-11-26 19:05:29 +0000278 // Attach to the base
279 event_base_set(server_->getEventBase(), &taskEvent_);
280
Mark Sleee02385b2007-06-09 01:21:16 +0000281 // Add the event and start up the server
282 if (-1 == event_add(&taskEvent_, 0)) {
283 GlobalOutput("TNonblockingServer::serve(): coult not event_add");
284 return;
285 }
David Reisse11f3072008-10-07 21:39:19 +0000286 try {
287 server_->addTask(task);
288 } catch (IllegalStateException & ise) {
289 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000290 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000291 close();
292 }
Mark Slee402ee282007-08-23 01:43:20 +0000293
294 // Set this connection idle so that libevent doesn't process more
295 // data on it while we're still waiting for the threadmanager to
296 // finish this task
297 setIdle();
Mark Sleee02385b2007-06-09 01:21:16 +0000298 return;
299 }
300 } else {
301 try {
302 // Invoke the processor
303 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
304 } catch (TTransportException &ttx) {
David Reiss01e55c12008-07-13 22:18:51 +0000305 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000306 close();
307 return;
308 } catch (TException &x) {
David Reiss01e55c12008-07-13 22:18:51 +0000309 GlobalOutput.printf("TException: Server::process() %s", x.what());
Mark Slee79b16942007-11-26 19:05:29 +0000310 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000311 return;
312 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000313 GlobalOutput.printf("Server::process() unknown exception");
Mark Sleee02385b2007-06-09 01:21:16 +0000314 close();
315 return;
316 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000317 }
318
Mark Slee402ee282007-08-23 01:43:20 +0000319 // Intentionally fall through here, the call to process has written into
320 // the writeBuffer_
321
Mark Sleee02385b2007-06-09 01:21:16 +0000322 case APP_WAIT_TASK:
323 // We have now finished processing a task and the result has been written
324 // into the outputTransport_, so we grab its contents and place them into
325 // the writeBuffer_ for actual writing by the libevent thread
326
Mark Slee2f6404d2006-10-10 01:37:40 +0000327 // Get the result of the operation
328 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
329
330 // If the function call generated return data, then move into the send
331 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000332 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000333 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000334
335 // Move into write state
336 writeBufferPos_ = 0;
337 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000338
David Reissaf787782008-07-03 20:29:34 +0000339 // Put the frame size into the write buffer
340 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
341 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000342
343 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000344 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000345 setWrite();
346
347 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000348 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000349
350 return;
351 }
352
David Reissc51986f2009-03-24 20:01:25 +0000353 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000354 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000355 goto LABEL_APP_INIT;
356
Mark Slee2f6404d2006-10-10 01:37:40 +0000357 case APP_SEND_RESULT:
358
Kevin Clark5ace1782009-03-04 21:10:58 +0000359 ++numWritesSinceReset_;
360
Mark Slee2f6404d2006-10-10 01:37:40 +0000361 // N.B.: We also intentionally fall through here into the INIT state!
362
Mark Slee92f00fb2006-10-25 01:28:17 +0000363 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000364 case APP_INIT:
365
Kevin Clark5ace1782009-03-04 21:10:58 +0000366 // reset the input buffer if we used it enough times that it might be bloated
367 if (numReadsSinceReset_ > 512)
368 {
369 void * new_buffer = std::realloc(readBuffer_, 1024);
370 if (new_buffer == NULL) {
371 GlobalOutput("TConnection::transition() realloc");
372 close();
373 return;
374 }
375 readBuffer_ = (uint8_t*) new_buffer;
376 readBufferSize_ = 1024;
377 numReadsSinceReset_ = 0;
378 }
379
Mark Slee2f6404d2006-10-10 01:37:40 +0000380 // Clear write buffer variables
381 writeBuffer_ = NULL;
382 writeBufferPos_ = 0;
383 writeBufferSize_ = 0;
384
385 // Set up read buffer for getting 4 bytes
386 readBufferPos_ = 0;
387 readWant_ = 4;
388
389 // Into read4 state we go
390 socketState_ = SOCKET_RECV;
391 appState_ = APP_READ_FRAME_SIZE;
392
393 // Register read event
394 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000395
Mark Slee2f6404d2006-10-10 01:37:40 +0000396 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000397 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000398
399 return;
400
401 case APP_READ_FRAME_SIZE:
402 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000403 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000404 sz = (int32_t)ntohl(sz);
405
406 if (sz <= 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000407 GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
Mark Slee2f6404d2006-10-10 01:37:40 +0000408 close();
409 return;
410 }
411
412 // Reset the read buffer
413 readWant_ = (uint32_t)sz;
414 readBufferPos_= 0;
415
416 // Move into read request state
417 appState_ = APP_READ_REQUEST;
418
419 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000420 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000421
422 return;
423
424 default:
David Reiss01e55c12008-07-13 22:18:51 +0000425 GlobalOutput.printf("Totally Fucked. Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000426 assert(0);
427 }
428}
429
430void TConnection::setFlags(short eventFlags) {
431 // Catch the do nothing case
432 if (eventFlags_ == eventFlags) {
433 return;
434 }
435
436 // Delete a previously existing event
437 if (eventFlags_ != 0) {
438 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000439 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000440 return;
441 }
442 }
443
444 // Update in memory structure
445 eventFlags_ = eventFlags;
446
Mark Slee402ee282007-08-23 01:43:20 +0000447 // Do not call event_set if there are no flags
448 if (!eventFlags_) {
449 return;
450 }
451
Mark Slee2f6404d2006-10-10 01:37:40 +0000452 /**
453 * event_set:
454 *
455 * Prepares the event structure &event to be used in future calls to
456 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000457 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000458 *
459 * The events can be either EV_READ, EV_WRITE, or both, indicating
460 * that an application can read or write from the file respectively without
461 * blocking.
462 *
Mark Sleee02385b2007-06-09 01:21:16 +0000463 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000464 * the event and the type of event which will be one of: EV_TIMEOUT,
465 * EV_SIGNAL, EV_READ, EV_WRITE.
466 *
467 * The additional flag EV_PERSIST makes an event_add() persistent until
468 * event_del() has been called.
469 *
470 * Once initialized, the &event struct can be used repeatedly with
471 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000472 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000473 * when an ev structure has been added to libevent using event_add() the
474 * structure must persist until the event occurs (assuming EV_PERSIST
475 * is not set) or is removed using event_del(). You may not reuse the same
476 * ev structure for multiple monitored descriptors; each descriptor needs
477 * its own ev.
478 */
479 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000480 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000481
482 // Add the event
483 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000484 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000485 }
486}
487
488/**
489 * Closes a connection
490 */
491void TConnection::close() {
492 // Delete the registered libevent
493 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000494 GlobalOutput("TConnection::close() event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000495 }
496
497 // Close the socket
498 if (socket_ > 0) {
499 ::close(socket_);
500 }
501 socket_ = 0;
502
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000503 // close any factory produced transports
504 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000505 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000506
Mark Slee2f6404d2006-10-10 01:37:40 +0000507 // Give this object back to the server that owns it
508 server_->returnConnection(this);
509}
510
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000511void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
512 if (readBufferSize_ > limit) {
513 readBufferSize_ = limit;
514 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
515 if (readBuffer_ == NULL) {
516 GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
517 close();
518 }
519 }
520}
521
Mark Slee2f6404d2006-10-10 01:37:40 +0000522/**
523 * Creates a new connection either by reusing an object off the stack or
524 * by allocating a new one entirely
525 */
526TConnection* TNonblockingServer::createConnection(int socket, short flags) {
527 // Check the stack
528 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000529 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000530 } else {
531 TConnection* result = connectionStack_.top();
532 connectionStack_.pop();
533 result->init(socket, flags, this);
534 return result;
535 }
536}
537
538/**
539 * Returns a connection to the stack
540 */
541void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000542 if (connectionStackLimit_ &&
543 (connectionStack_.size() >= connectionStackLimit_)) {
544 delete connection;
545 } else {
546 connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
547 connectionStack_.push(connection);
548 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000549}
550
551/**
David Reissa79e4882008-03-05 07:51:47 +0000552 * Server socket had something happen. We accept all waiting client
553 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000554 */
555void TNonblockingServer::handleEvent(int fd, short which) {
556 // Make sure that libevent didn't fuck up the socket handles
557 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000558
Mark Slee2f6404d2006-10-10 01:37:40 +0000559 // Server socket accepted a new connection
560 socklen_t addrLen;
561 struct sockaddr addr;
Mark Slee79b16942007-11-26 19:05:29 +0000562 addrLen = sizeof(addr);
563
Mark Slee2f6404d2006-10-10 01:37:40 +0000564 // Going to accept a new client socket
565 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000566
Mark Slee2f6404d2006-10-10 01:37:40 +0000567 // Accept as many new clients as possible, even though libevent signaled only
568 // one, this helps us to avoid having to go back into the libevent engine so
569 // many times
570 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
571
572 // Explicitly set this socket to NONBLOCK mode
573 int flags;
574 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
575 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000576 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000577 close(clientSocket);
578 return;
579 }
580
581 // Create a new TConnection for this client socket.
582 TConnection* clientConnection =
583 createConnection(clientSocket, EV_READ | EV_PERSIST);
584
585 // Fail fast if we could not create a TConnection object
586 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000587 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000588 close(clientSocket);
589 return;
590 }
591
592 // Put this client connection into the proper state
593 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000594
595 // addrLen is written by the accept() call, so needs to be set before the next call.
596 addrLen = sizeof(addr);
Mark Slee2f6404d2006-10-10 01:37:40 +0000597 }
Mark Slee79b16942007-11-26 19:05:29 +0000598
Mark Slee2f6404d2006-10-10 01:37:40 +0000599 // Done looping accept, now we have to make sure the error is due to
600 // blocking. Any other error is a problem
601 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000602 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000603 }
604}
605
606/**
Mark Slee79b16942007-11-26 19:05:29 +0000607 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000608 */
Mark Slee79b16942007-11-26 19:05:29 +0000609void TNonblockingServer::listenSocket() {
610 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000611 struct addrinfo hints, *res, *res0;
612 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000613
Mark Sleefb4b5142007-11-20 01:27:08 +0000614 char port[sizeof("65536") + 1];
615 memset(&hints, 0, sizeof(hints));
616 hints.ai_family = PF_UNSPEC;
617 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000618 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000619 sprintf(port, "%d", port_);
620
621 // Wildcard address
622 error = getaddrinfo(NULL, port, &hints, &res0);
623 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000624 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
625 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000626 return;
627 }
628
629 // Pick the ipv6 address first since ipv4 addresses can be mapped
630 // into ipv6 space.
631 for (res = res0; res; res = res->ai_next) {
632 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
633 break;
634 }
635
Mark Slee2f6404d2006-10-10 01:37:40 +0000636 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000637 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
638 if (s == -1) {
639 freeaddrinfo(res0);
640 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000641 }
642
David Reiss13aea462008-06-10 22:56:04 +0000643 #ifdef IPV6_V6ONLY
644 int zero = 0;
645 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
646 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
647 }
648 #endif // #ifdef IPV6_V6ONLY
649
650
Mark Slee79b16942007-11-26 19:05:29 +0000651 int one = 1;
652
653 // Set reuseaddr to avoid 2MSL delay on server restart
654 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
655
656 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
657 close(s);
658 freeaddrinfo(res0);
659 throw TException("TNonblockingServer::serve() bind");
660 }
661
662 // Done with the addr info
663 freeaddrinfo(res0);
664
665 // Set up this file descriptor for listening
666 listenSocket(s);
667}
668
669/**
670 * Takes a socket created by listenSocket() and sets various options on it
671 * to prepare for use in the server.
672 */
673void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000674 // Set socket to nonblocking mode
675 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000676 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
677 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
678 close(s);
679 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000680 }
681
682 int one = 1;
683 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000684
685 // Keepalive to ensure full result flushing
Mark Slee79b16942007-11-26 19:05:29 +0000686 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000687
688 // Turn linger off to avoid hung sockets
Mark Slee79b16942007-11-26 19:05:29 +0000689 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000690
691 // Set TCP nodelay if available, MAC OS X Hack
692 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
693 #ifndef TCP_NOPUSH
Mark Slee79b16942007-11-26 19:05:29 +0000694 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000695 #endif
696
Mark Slee79b16942007-11-26 19:05:29 +0000697 if (listen(s, LISTEN_BACKLOG) == -1) {
698 close(s);
699 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000700 }
701
Mark Slee79b16942007-11-26 19:05:29 +0000702 // Cool, this socket is good to go, set it as the serverSocket_
703 serverSocket_ = s;
704}
705
706/**
707 * Register the core libevent events onto the proper base.
708 */
709void TNonblockingServer::registerEvents(event_base* base) {
710 assert(serverSocket_ != -1);
711 assert(!eventBase_);
712 eventBase_ = base;
713
714 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000715 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000716 event_get_version(),
717 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000718
719 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000720 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000721 serverSocket_,
722 EV_READ | EV_PERSIST,
723 TNonblockingServer::eventHandler,
724 this);
Mark Slee79b16942007-11-26 19:05:29 +0000725 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000726
727 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000728 if (-1 == event_add(&serverEvent_, 0)) {
729 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000730 }
Mark Slee79b16942007-11-26 19:05:29 +0000731}
732
733/**
734 * Main workhorse function, starts up the server listening on a port and
735 * loops over the libevent handler.
736 */
737void TNonblockingServer::serve() {
738 // Init socket
739 listenSocket();
740
741 // Initialize libevent core
742 registerEvents(static_cast<event_base*>(event_init()));
Mark Slee2f6404d2006-10-10 01:37:40 +0000743
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000744 // Run the preServe event
745 if (eventHandler_ != NULL) {
746 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000747 }
748
Mark Sleee02385b2007-06-09 01:21:16 +0000749 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000750 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000751}
752
T Jake Lucianib5e62212009-01-31 22:36:20 +0000753}}} // apache::thrift::server