blob: 5375387a2598a463432182d537eb0d9246502b60 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000022
Mark Sleee02385b2007-06-09 01:21:16 +000023#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000024#include <sys/socket.h>
25#include <netinet/in.h>
26#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000027#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000028#include <fcntl.h>
29#include <errno.h>
30#include <assert.h>
31
David Reiss9b903442009-10-21 05:51:28 +000032#ifndef AF_LOCAL
33#define AF_LOCAL AF_UNIX
34#endif
35
T Jake Lucianib5e62212009-01-31 22:36:20 +000036namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000037
T Jake Lucianib5e62212009-01-31 22:36:20 +000038using namespace apache::thrift::protocol;
39using namespace apache::thrift::transport;
40using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000041using namespace std;
42
43class TConnection::Task: public Runnable {
44 public:
45 Task(boost::shared_ptr<TProcessor> processor,
46 boost::shared_ptr<TProtocol> input,
47 boost::shared_ptr<TProtocol> output,
48 int taskHandle) :
49 processor_(processor),
50 input_(input),
51 output_(output),
52 taskHandle_(taskHandle) {}
53
54 void run() {
55 try {
56 while (processor_->process(input_, output_)) {
57 if (!input_->getTransport()->peek()) {
58 break;
59 }
60 }
61 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000062 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000063 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000064 cerr << "TNonblockingServer exception: " << x.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000065 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000066 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000067 }
Mark Slee79b16942007-11-26 19:05:29 +000068
Mark Sleee02385b2007-06-09 01:21:16 +000069 // Signal completion back to the libevent thread via a socketpair
70 int8_t b = 0;
71 if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
David Reiss01e55c12008-07-13 22:18:51 +000072 GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +000073 }
74 if (-1 == ::close(taskHandle_)) {
David Reiss01e55c12008-07-13 22:18:51 +000075 GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +000076 }
77 }
78
79 private:
80 boost::shared_ptr<TProcessor> processor_;
81 boost::shared_ptr<TProtocol> input_;
82 boost::shared_ptr<TProtocol> output_;
83 int taskHandle_;
84};
Mark Slee5ea15f92007-03-05 22:55:59 +000085
86void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000087 socket_ = socket;
88 server_ = s;
89 appState_ = APP_INIT;
90 eventFlags_ = 0;
91
92 readBufferPos_ = 0;
93 readWant_ = 0;
94
95 writeBuffer_ = NULL;
96 writeBufferSize_ = 0;
97 writeBufferPos_ = 0;
98
99 socketState_ = SOCKET_RECV;
100 appState_ = APP_INIT;
Mark Slee79b16942007-11-26 19:05:29 +0000101
Mark Sleee02385b2007-06-09 01:21:16 +0000102 taskHandle_ = -1;
103
Mark Slee2f6404d2006-10-10 01:37:40 +0000104 // Set flags, which also registers the event
105 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000106
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000107 // get input/transports
108 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
109 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000110
111 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000112 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
113 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000114}
115
116void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000117 int flags=0, got=0, left=0, sent=0;
118 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000119
120 switch (socketState_) {
121 case SOCKET_RECV:
122 // It is an error to be in this state if we already have all the data
123 assert(readBufferPos_ < readWant_);
124
Mark Slee2f6404d2006-10-10 01:37:40 +0000125 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000126 if (readWant_ > readBufferSize_) {
127 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000128 readBufferSize_ *= 2;
129 }
David Reissd7a16f42008-02-19 22:47:29 +0000130 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000131 if (readBuffer_ == NULL) {
boz6ded7752007-06-05 22:41:18 +0000132 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000133 close();
134 return;
135 }
136 }
137
138 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000139 fetch = readWant_ - readBufferPos_;
140 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee79b16942007-11-26 19:05:29 +0000141
Mark Slee2f6404d2006-10-10 01:37:40 +0000142 if (got > 0) {
143 // Move along in the buffer
144 readBufferPos_ += got;
145
146 // Check that we did not overdo it
147 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000148
Mark Slee2f6404d2006-10-10 01:37:40 +0000149 // We are done reading, move onto the next state
150 if (readBufferPos_ == readWant_) {
151 transition();
152 }
153 return;
154 } else if (got == -1) {
155 // Blocking errors are okay, just move on
156 if (errno == EAGAIN || errno == EWOULDBLOCK) {
157 return;
158 }
159
160 if (errno != ECONNRESET) {
David Reiss01e55c12008-07-13 22:18:51 +0000161 GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000162 }
163 }
164
165 // Whenever we get down here it means a remote disconnect
166 close();
Mark Slee79b16942007-11-26 19:05:29 +0000167
Mark Slee2f6404d2006-10-10 01:37:40 +0000168 return;
169
170 case SOCKET_SEND:
171 // Should never have position past size
172 assert(writeBufferPos_ <= writeBufferSize_);
173
174 // If there is no data to send, then let us move on
175 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000176 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000177 transition();
178 return;
179 }
180
181 flags = 0;
182 #ifdef MSG_NOSIGNAL
183 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
184 // check for the EPIPE return condition and close the socket in that case
185 flags |= MSG_NOSIGNAL;
186 #endif // ifdef MSG_NOSIGNAL
187
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000188 left = writeBufferSize_ - writeBufferPos_;
189 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000190
191 if (sent <= 0) {
192 // Blocking errors are okay, just move on
193 if (errno == EAGAIN || errno == EWOULDBLOCK) {
194 return;
195 }
196 if (errno != EPIPE) {
David Reiss01e55c12008-07-13 22:18:51 +0000197 GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000198 }
199 close();
200 return;
201 }
202
203 writeBufferPos_ += sent;
204
205 // Did we overdo it?
206 assert(writeBufferPos_ <= writeBufferSize_);
207
Mark Slee79b16942007-11-26 19:05:29 +0000208 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000209 if (writeBufferPos_ == writeBufferSize_) {
210 transition();
211 }
212
213 return;
214
215 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000216 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000217 assert(0);
218 }
219}
220
221/**
222 * This is called when the application transitions from one state into
223 * another. This means that it has finished writing the data that it needed
224 * to, or finished receiving the data that it needed to.
225 */
226void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000227
228 int sz = 0;
229
Mark Slee2f6404d2006-10-10 01:37:40 +0000230 // Switch upon the state that we are currently in and move to a new state
231 switch (appState_) {
232
233 case APP_READ_REQUEST:
234 // We are done reading the request, package the read buffer into transport
235 // and get back some data from the dispatch function
Kevin Clark5ace1782009-03-04 21:10:58 +0000236 // If we've used these transport buffers enough times, reset them to avoid bloating
237
Mark Slee2f6404d2006-10-10 01:37:40 +0000238 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
Kevin Clark5ace1782009-03-04 21:10:58 +0000239 ++numReadsSinceReset_;
240 if (numWritesSinceReset_ < 512) {
241 outputTransport_->resetBuffer();
242 } else {
243 // reset the capacity of the output transport if we used it enough times that it might be bloated
244 try {
245 outputTransport_->resetBuffer(true);
246 numWritesSinceReset_ = 0;
247 } catch (TTransportException &ttx) {
248 GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
249 close();
250 return;
251 }
252 }
253
David Reiss52cb7a72008-06-30 21:40:35 +0000254 // Prepend four bytes of blank space to the buffer so we can
255 // write the frame size there later.
256 outputTransport_->getWritePtr(4);
257 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000258
Mark Sleee02385b2007-06-09 01:21:16 +0000259 if (server_->isThreadPoolProcessing()) {
260 // We are setting up a Task to do this work and we will wait on it
261 int sv[2];
262 if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
David Reiss01e55c12008-07-13 22:18:51 +0000263 GlobalOutput.perror("TConnection::socketpair() failed ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +0000264 // Now we will fall through to the APP_WAIT_TASK block with no response
265 } else {
266 // Create task and dispatch to the thread manager
267 boost::shared_ptr<Runnable> task =
268 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
269 inputProtocol_,
270 outputProtocol_,
271 sv[1]));
Mark Slee79b16942007-11-26 19:05:29 +0000272 // The application is now waiting on the task to finish
Mark Sleee02385b2007-06-09 01:21:16 +0000273 appState_ = APP_WAIT_TASK;
Mark Slee79b16942007-11-26 19:05:29 +0000274
275 // Create an event to be notified when the task finishes
Mark Sleee02385b2007-06-09 01:21:16 +0000276 event_set(&taskEvent_,
277 taskHandle_ = sv[0],
278 EV_READ,
279 TConnection::taskHandler,
280 this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000281
Mark Slee79b16942007-11-26 19:05:29 +0000282 // Attach to the base
283 event_base_set(server_->getEventBase(), &taskEvent_);
284
Mark Sleee02385b2007-06-09 01:21:16 +0000285 // Add the event and start up the server
286 if (-1 == event_add(&taskEvent_, 0)) {
287 GlobalOutput("TNonblockingServer::serve(): coult not event_add");
288 return;
289 }
David Reisse11f3072008-10-07 21:39:19 +0000290 try {
291 server_->addTask(task);
292 } catch (IllegalStateException & ise) {
293 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000294 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000295 close();
296 }
Mark Slee402ee282007-08-23 01:43:20 +0000297
298 // Set this connection idle so that libevent doesn't process more
299 // data on it while we're still waiting for the threadmanager to
300 // finish this task
301 setIdle();
Mark Sleee02385b2007-06-09 01:21:16 +0000302 return;
303 }
304 } else {
305 try {
306 // Invoke the processor
307 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
308 } catch (TTransportException &ttx) {
David Reiss01e55c12008-07-13 22:18:51 +0000309 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000310 close();
311 return;
312 } catch (TException &x) {
David Reiss01e55c12008-07-13 22:18:51 +0000313 GlobalOutput.printf("TException: Server::process() %s", x.what());
Mark Slee79b16942007-11-26 19:05:29 +0000314 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000315 return;
316 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000317 GlobalOutput.printf("Server::process() unknown exception");
Mark Sleee02385b2007-06-09 01:21:16 +0000318 close();
319 return;
320 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000321 }
322
Mark Slee402ee282007-08-23 01:43:20 +0000323 // Intentionally fall through here, the call to process has written into
324 // the writeBuffer_
325
Mark Sleee02385b2007-06-09 01:21:16 +0000326 case APP_WAIT_TASK:
327 // We have now finished processing a task and the result has been written
328 // into the outputTransport_, so we grab its contents and place them into
329 // the writeBuffer_ for actual writing by the libevent thread
330
Mark Slee2f6404d2006-10-10 01:37:40 +0000331 // Get the result of the operation
332 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
333
334 // If the function call generated return data, then move into the send
335 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000336 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000337 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000338
339 // Move into write state
340 writeBufferPos_ = 0;
341 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000342
David Reissaf787782008-07-03 20:29:34 +0000343 // Put the frame size into the write buffer
344 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
345 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000346
347 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000348 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000349 setWrite();
350
351 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000352 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000353
354 return;
355 }
356
David Reissc51986f2009-03-24 20:01:25 +0000357 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000358 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000359 goto LABEL_APP_INIT;
360
Mark Slee2f6404d2006-10-10 01:37:40 +0000361 case APP_SEND_RESULT:
362
Kevin Clark5ace1782009-03-04 21:10:58 +0000363 ++numWritesSinceReset_;
364
Mark Slee2f6404d2006-10-10 01:37:40 +0000365 // N.B.: We also intentionally fall through here into the INIT state!
366
Mark Slee92f00fb2006-10-25 01:28:17 +0000367 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000368 case APP_INIT:
369
Kevin Clark5ace1782009-03-04 21:10:58 +0000370 // reset the input buffer if we used it enough times that it might be bloated
371 if (numReadsSinceReset_ > 512)
372 {
373 void * new_buffer = std::realloc(readBuffer_, 1024);
374 if (new_buffer == NULL) {
375 GlobalOutput("TConnection::transition() realloc");
376 close();
377 return;
378 }
379 readBuffer_ = (uint8_t*) new_buffer;
380 readBufferSize_ = 1024;
381 numReadsSinceReset_ = 0;
382 }
383
Mark Slee2f6404d2006-10-10 01:37:40 +0000384 // Clear write buffer variables
385 writeBuffer_ = NULL;
386 writeBufferPos_ = 0;
387 writeBufferSize_ = 0;
388
389 // Set up read buffer for getting 4 bytes
390 readBufferPos_ = 0;
391 readWant_ = 4;
392
393 // Into read4 state we go
394 socketState_ = SOCKET_RECV;
395 appState_ = APP_READ_FRAME_SIZE;
396
397 // Register read event
398 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000399
Mark Slee2f6404d2006-10-10 01:37:40 +0000400 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000401 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000402
403 return;
404
405 case APP_READ_FRAME_SIZE:
406 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000407 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000408 sz = (int32_t)ntohl(sz);
409
410 if (sz <= 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000411 GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
Mark Slee2f6404d2006-10-10 01:37:40 +0000412 close();
413 return;
414 }
415
416 // Reset the read buffer
417 readWant_ = (uint32_t)sz;
418 readBufferPos_= 0;
419
420 // Move into read request state
421 appState_ = APP_READ_REQUEST;
422
423 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000424 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000425
426 return;
427
428 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000429 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000430 assert(0);
431 }
432}
433
434void TConnection::setFlags(short eventFlags) {
435 // Catch the do nothing case
436 if (eventFlags_ == eventFlags) {
437 return;
438 }
439
440 // Delete a previously existing event
441 if (eventFlags_ != 0) {
442 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000443 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000444 return;
445 }
446 }
447
448 // Update in memory structure
449 eventFlags_ = eventFlags;
450
Mark Slee402ee282007-08-23 01:43:20 +0000451 // Do not call event_set if there are no flags
452 if (!eventFlags_) {
453 return;
454 }
455
Mark Slee2f6404d2006-10-10 01:37:40 +0000456 /**
457 * event_set:
458 *
459 * Prepares the event structure &event to be used in future calls to
460 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000461 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000462 *
463 * The events can be either EV_READ, EV_WRITE, or both, indicating
464 * that an application can read or write from the file respectively without
465 * blocking.
466 *
Mark Sleee02385b2007-06-09 01:21:16 +0000467 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000468 * the event and the type of event which will be one of: EV_TIMEOUT,
469 * EV_SIGNAL, EV_READ, EV_WRITE.
470 *
471 * The additional flag EV_PERSIST makes an event_add() persistent until
472 * event_del() has been called.
473 *
474 * Once initialized, the &event struct can be used repeatedly with
475 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000476 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000477 * when an ev structure has been added to libevent using event_add() the
478 * structure must persist until the event occurs (assuming EV_PERSIST
479 * is not set) or is removed using event_del(). You may not reuse the same
480 * ev structure for multiple monitored descriptors; each descriptor needs
481 * its own ev.
482 */
483 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000484 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000485
486 // Add the event
487 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000488 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000489 }
490}
491
492/**
493 * Closes a connection
494 */
495void TConnection::close() {
496 // Delete the registered libevent
497 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000498 GlobalOutput("TConnection::close() event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000499 }
500
501 // Close the socket
502 if (socket_ > 0) {
503 ::close(socket_);
504 }
505 socket_ = 0;
506
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000507 // close any factory produced transports
508 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000509 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000510
Mark Slee2f6404d2006-10-10 01:37:40 +0000511 // Give this object back to the server that owns it
512 server_->returnConnection(this);
513}
514
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000515void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
516 if (readBufferSize_ > limit) {
517 readBufferSize_ = limit;
518 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
519 if (readBuffer_ == NULL) {
520 GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
521 close();
522 }
523 }
524}
525
Mark Slee2f6404d2006-10-10 01:37:40 +0000526/**
527 * Creates a new connection either by reusing an object off the stack or
528 * by allocating a new one entirely
529 */
530TConnection* TNonblockingServer::createConnection(int socket, short flags) {
531 // Check the stack
532 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000533 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000534 } else {
535 TConnection* result = connectionStack_.top();
536 connectionStack_.pop();
537 result->init(socket, flags, this);
538 return result;
539 }
540}
541
542/**
543 * Returns a connection to the stack
544 */
545void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000546 if (connectionStackLimit_ &&
547 (connectionStack_.size() >= connectionStackLimit_)) {
548 delete connection;
549 } else {
550 connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
551 connectionStack_.push(connection);
552 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000553}
554
555/**
David Reissa79e4882008-03-05 07:51:47 +0000556 * Server socket had something happen. We accept all waiting client
557 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000558 */
559void TNonblockingServer::handleEvent(int fd, short which) {
David Reiss3bb5e052010-01-25 19:31:31 +0000560 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000561 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000562
Mark Slee2f6404d2006-10-10 01:37:40 +0000563 // Server socket accepted a new connection
564 socklen_t addrLen;
565 struct sockaddr addr;
Mark Slee79b16942007-11-26 19:05:29 +0000566 addrLen = sizeof(addr);
567
Mark Slee2f6404d2006-10-10 01:37:40 +0000568 // Going to accept a new client socket
569 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000570
Mark Slee2f6404d2006-10-10 01:37:40 +0000571 // Accept as many new clients as possible, even though libevent signaled only
572 // one, this helps us to avoid having to go back into the libevent engine so
573 // many times
574 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
575
576 // Explicitly set this socket to NONBLOCK mode
577 int flags;
578 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
579 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000580 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000581 close(clientSocket);
582 return;
583 }
584
585 // Create a new TConnection for this client socket.
586 TConnection* clientConnection =
587 createConnection(clientSocket, EV_READ | EV_PERSIST);
588
589 // Fail fast if we could not create a TConnection object
590 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000591 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000592 close(clientSocket);
593 return;
594 }
595
596 // Put this client connection into the proper state
597 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000598
599 // addrLen is written by the accept() call, so needs to be set before the next call.
600 addrLen = sizeof(addr);
Mark Slee2f6404d2006-10-10 01:37:40 +0000601 }
Mark Slee79b16942007-11-26 19:05:29 +0000602
Mark Slee2f6404d2006-10-10 01:37:40 +0000603 // Done looping accept, now we have to make sure the error is due to
604 // blocking. Any other error is a problem
605 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000606 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000607 }
608}
609
610/**
Mark Slee79b16942007-11-26 19:05:29 +0000611 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000612 */
Mark Slee79b16942007-11-26 19:05:29 +0000613void TNonblockingServer::listenSocket() {
614 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000615 struct addrinfo hints, *res, *res0;
616 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000617
Mark Sleefb4b5142007-11-20 01:27:08 +0000618 char port[sizeof("65536") + 1];
619 memset(&hints, 0, sizeof(hints));
620 hints.ai_family = PF_UNSPEC;
621 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000622 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000623 sprintf(port, "%d", port_);
624
625 // Wildcard address
626 error = getaddrinfo(NULL, port, &hints, &res0);
627 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000628 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
629 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000630 return;
631 }
632
633 // Pick the ipv6 address first since ipv4 addresses can be mapped
634 // into ipv6 space.
635 for (res = res0; res; res = res->ai_next) {
636 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
637 break;
638 }
639
Mark Slee2f6404d2006-10-10 01:37:40 +0000640 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000641 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
642 if (s == -1) {
643 freeaddrinfo(res0);
644 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000645 }
646
David Reiss13aea462008-06-10 22:56:04 +0000647 #ifdef IPV6_V6ONLY
648 int zero = 0;
649 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
650 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
651 }
652 #endif // #ifdef IPV6_V6ONLY
653
654
Mark Slee79b16942007-11-26 19:05:29 +0000655 int one = 1;
656
657 // Set reuseaddr to avoid 2MSL delay on server restart
658 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
659
660 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
661 close(s);
662 freeaddrinfo(res0);
663 throw TException("TNonblockingServer::serve() bind");
664 }
665
666 // Done with the addr info
667 freeaddrinfo(res0);
668
669 // Set up this file descriptor for listening
670 listenSocket(s);
671}
672
673/**
674 * Takes a socket created by listenSocket() and sets various options on it
675 * to prepare for use in the server.
676 */
677void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000678 // Set socket to nonblocking mode
679 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000680 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
681 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
682 close(s);
683 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000684 }
685
686 int one = 1;
687 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000688
689 // Keepalive to ensure full result flushing
Mark Slee79b16942007-11-26 19:05:29 +0000690 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000691
692 // Turn linger off to avoid hung sockets
Mark Slee79b16942007-11-26 19:05:29 +0000693 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000694
695 // Set TCP nodelay if available, MAC OS X Hack
696 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
697 #ifndef TCP_NOPUSH
Mark Slee79b16942007-11-26 19:05:29 +0000698 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000699 #endif
700
Mark Slee79b16942007-11-26 19:05:29 +0000701 if (listen(s, LISTEN_BACKLOG) == -1) {
702 close(s);
703 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000704 }
705
Mark Slee79b16942007-11-26 19:05:29 +0000706 // Cool, this socket is good to go, set it as the serverSocket_
707 serverSocket_ = s;
708}
709
710/**
711 * Register the core libevent events onto the proper base.
712 */
713void TNonblockingServer::registerEvents(event_base* base) {
714 assert(serverSocket_ != -1);
715 assert(!eventBase_);
716 eventBase_ = base;
717
718 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000719 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000720 event_get_version(),
721 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000722
723 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000724 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000725 serverSocket_,
726 EV_READ | EV_PERSIST,
727 TNonblockingServer::eventHandler,
728 this);
Mark Slee79b16942007-11-26 19:05:29 +0000729 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000730
731 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000732 if (-1 == event_add(&serverEvent_, 0)) {
733 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000734 }
Mark Slee79b16942007-11-26 19:05:29 +0000735}
736
737/**
738 * Main workhorse function, starts up the server listening on a port and
739 * loops over the libevent handler.
740 */
741void TNonblockingServer::serve() {
742 // Init socket
743 listenSocket();
744
745 // Initialize libevent core
746 registerEvents(static_cast<event_base*>(event_init()));
Mark Slee2f6404d2006-10-10 01:37:40 +0000747
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000748 // Run the preServe event
749 if (eventHandler_ != NULL) {
750 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000751 }
752
Mark Sleee02385b2007-06-09 01:21:16 +0000753 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000754 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000755}
756
T Jake Lucianib5e62212009-01-31 22:36:20 +0000757}}} // apache::thrift::server