blob: cb1e27b8c1b89efaea6206484848ac0218935105 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000022#include <transport/TSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000023
Mark Sleee02385b2007-06-09 01:21:16 +000024#include <iostream>
Roger Meier30aae0c2011-07-08 12:23:31 +000025
26#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000027#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000028#endif
29
30#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000031#include <netinet/in.h>
32#include <netinet/tcp.h>
Bryan Duxbury76c43682011-08-24 21:26:48 +000033#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000034#endif
35
36#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000037#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000038#endif
39
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <fcntl.h>
41#include <errno.h>
42#include <assert.h>
43
David Reiss9b903442009-10-21 05:51:28 +000044#ifndef AF_LOCAL
45#define AF_LOCAL AF_UNIX
46#endif
47
T Jake Lucianib5e62212009-01-31 22:36:20 +000048namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000049
T Jake Lucianib5e62212009-01-31 22:36:20 +000050using namespace apache::thrift::protocol;
51using namespace apache::thrift::transport;
52using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000053using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000054using apache::thrift::transport::TSocket;
55using apache::thrift::transport::TTransportException;
Mark Sleee02385b2007-06-09 01:21:16 +000056
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000057/// Three states for sockets: recv frame size, recv data, and send mode
58enum TSocketState {
59 SOCKET_RECV_FRAMING,
60 SOCKET_RECV,
61 SOCKET_SEND
62};
63
64/**
65 * Five states for the nonblocking server:
66 * 1) initialize
67 * 2) read 4 byte frame size
68 * 3) read frame of data
69 * 4) send back data (if any)
70 * 5) force immediate connection close
71 */
72enum TAppState {
73 APP_INIT,
74 APP_READ_FRAME_SIZE,
75 APP_READ_REQUEST,
76 APP_WAIT_TASK,
77 APP_SEND_RESULT,
78 APP_CLOSE_CONNECTION
79};
80
81/**
82 * Represents a connection that is handled via libevent. This connection
83 * essentially encapsulates a socket that has some associated libevent state.
84 */
85class TNonblockingServer::TConnection {
86 private:
87
88 /// Server handle
89 TNonblockingServer* server_;
90
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +000091 /// TProcessor
92 boost::shared_ptr<TProcessor> processor_;
93
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000094 /// Object wrapping network socket
95 boost::shared_ptr<TSocket> tSocket_;
96
97 /// Libevent object
98 struct event event_;
99
100 /// Libevent flags
101 short eventFlags_;
102
103 /// Socket mode
104 TSocketState socketState_;
105
106 /// Application state
107 TAppState appState_;
108
109 /// How much data needed to read
110 uint32_t readWant_;
111
112 /// Where in the read buffer are we
113 uint32_t readBufferPos_;
114
115 /// Read buffer
116 uint8_t* readBuffer_;
117
118 /// Read buffer size
119 uint32_t readBufferSize_;
120
121 /// Write buffer
122 uint8_t* writeBuffer_;
123
124 /// Write buffer size
125 uint32_t writeBufferSize_;
126
127 /// How far through writing are we?
128 uint32_t writeBufferPos_;
129
130 /// Largest size of write buffer seen since buffer was constructed
131 size_t largestWriteBufferSize_;
132
133 /// Count of the number of calls for use with getResizeBufferEveryN().
134 int32_t callsForResize_;
135
136 /// Task handle
137 int taskHandle_;
138
139 /// Task event
140 struct event taskEvent_;
141
142 /// Transport to read from
143 boost::shared_ptr<TMemoryBuffer> inputTransport_;
144
145 /// Transport that processor writes to
146 boost::shared_ptr<TMemoryBuffer> outputTransport_;
147
148 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
149 boost::shared_ptr<TTransport> factoryInputTransport_;
150 boost::shared_ptr<TTransport> factoryOutputTransport_;
151
152 /// Protocol decoder
153 boost::shared_ptr<TProtocol> inputProtocol_;
154
155 /// Protocol encoder
156 boost::shared_ptr<TProtocol> outputProtocol_;
157
158 /// Server event handler, if any
159 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
160
161 /// Thrift call context, if any
162 void *connectionContext_;
163
164 /// Go into read mode
165 void setRead() {
166 setFlags(EV_READ | EV_PERSIST);
167 }
168
169 /// Go into write mode
170 void setWrite() {
171 setFlags(EV_WRITE | EV_PERSIST);
172 }
173
174 /// Set socket idle
175 void setIdle() {
176 setFlags(0);
177 }
178
179 /**
180 * Set event flags for this connection.
181 *
182 * @param eventFlags flags we pass to libevent for the connection.
183 */
184 void setFlags(short eventFlags);
185
186 /**
187 * Libevent handler called (via our static wrapper) when the connection
188 * socket had something happen. Rather than use the flags libevent passed,
189 * we use the connection state to determine whether we need to read or
190 * write the socket.
191 */
192 void workSocket();
193
194 /// Close this connection and free or reset its resources.
195 void close();
196
197 public:
198
199 class Task;
200
201 /// Constructor
202 TConnection(int socket, short eventFlags, TNonblockingServer *s,
203 const sockaddr* addr, socklen_t addrLen) {
204 readBuffer_ = NULL;
205 readBufferSize_ = 0;
206
207 // Allocate input and output transports
208 // these only need to be allocated once per TConnection (they don't need to be
209 // reallocated on init() call)
210 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
211 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
212 tSocket_.reset(new TSocket());
213
214 init(socket, eventFlags, s, addr, addrLen);
215 server_->incrementNumConnections();
216 }
217
218 ~TConnection() {
219 std::free(readBuffer_);
220 server_->decrementNumConnections();
221 }
222
223 /**
224 * Check buffers against any size limits and shrink it if exceeded.
225 *
226 * @param readLimit we reduce read buffer size to this (if nonzero).
227 * @param writeLimit if nonzero and write buffer is larger, replace it.
228 */
229 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
230
231 /// Initialize
232 void init(int socket, short eventFlags, TNonblockingServer *s,
233 const sockaddr* addr, socklen_t addrLen);
234
235 /**
236 * This is called when the application transitions from one state into
237 * another. This means that it has finished writing the data that it needed
238 * to, or finished receiving the data that it needed to.
239 */
240 void transition();
241
242 /**
243 * C-callable event handler for connection events. Provides a callback
244 * that libevent can understand which invokes connection_->workSocket().
245 *
246 * @param fd the descriptor the event occurred on.
247 * @param which the flags associated with the event.
248 * @param v void* callback arg where we placed TConnection's "this".
249 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000250 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000251 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
252 ((TConnection*)v)->workSocket();
253 }
254
255 /**
256 * C-callable event handler for signaling task completion. Provides a
257 * callback that libevent can understand that will read a connection
258 * object's address from a pipe and call connection->transition() for
259 * that object.
260 *
261 * @param fd the descriptor the event occurred on.
262 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000263 static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000264 TConnection* connection;
265 ssize_t nBytes;
Bryan Duxbury266b1732011-09-01 16:50:28 +0000266 while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000267 == sizeof(TConnection*)) {
268 connection->transition();
269 }
270 if (nBytes > 0) {
271 throw TException("TConnection::taskHandler unexpected partial read");
272 }
273 if (errno != EWOULDBLOCK && errno != EAGAIN) {
274 GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
275 }
276 }
277
278 /**
279 * Notification to server that processing has ended on this request.
280 * Can be called either when processing is completed or when a waiting
281 * task has been preemptively terminated (on overload).
282 *
283 * @return true if successful, false if unable to notify (check errno).
284 */
285 bool notifyServer() {
286 TConnection* connection = this;
Bryan Duxbury266b1732011-09-01 16:50:28 +0000287 if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
288 sizeof(TConnection*), 0) != sizeof(TConnection*)) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000289 return false;
290 }
291
292 return true;
293 }
294
295 /// Force connection shutdown for this connection.
296 void forceClose() {
297 appState_ = APP_CLOSE_CONNECTION;
298 if (!notifyServer()) {
299 throw TException("TConnection::forceClose: failed write on notify pipe");
300 }
301 }
302
303 /// return the server this connection was initialized for.
304 TNonblockingServer* getServer() {
305 return server_;
306 }
307
308 /// get state of connection.
309 TAppState getState() {
310 return appState_;
311 }
312
313 /// return the TSocket transport wrapping this network connection
314 boost::shared_ptr<TSocket> getTSocket() const {
315 return tSocket_;
316 }
317
318 /// return the server event handler if any
319 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
320 return serverEventHandler_;
321 }
322
323 /// return the Thrift connection context if any
324 void* getConnectionContext() {
325 return connectionContext_;
326 }
327
328};
329
330class TNonblockingServer::TConnection::Task: public Runnable {
Mark Sleee02385b2007-06-09 01:21:16 +0000331 public:
332 Task(boost::shared_ptr<TProcessor> processor,
333 boost::shared_ptr<TProtocol> input,
334 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +0000335 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +0000336 processor_(processor),
337 input_(input),
338 output_(output),
David Reiss105961d2010-10-06 17:10:17 +0000339 connection_(connection),
340 serverEventHandler_(connection_->getServerEventHandler()),
341 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000342
343 void run() {
344 try {
David Reiss105961d2010-10-06 17:10:17 +0000345 for (;;) {
346 if (serverEventHandler_ != NULL) {
347 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
348 }
349 if (!processor_->process(input_, output_, connectionContext_) ||
350 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000351 break;
352 }
353 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000354 } catch (const TTransportException& ttx) {
355 GlobalOutput.printf("TNonblockingServer client died: %s", ttx.what());
356 } catch (const bad_alloc&) {
357 GlobalOutput("TNonblockingServer caught bad_alloc exception.");
David Reiss28e88ec2010-03-09 05:19:27 +0000358 exit(-1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000359 } catch (const std::exception& x) {
360 GlobalOutput.printf("TNonblockingServer process() exception: %s: %s",
361 typeid(x).name(), x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000362 } catch (...) {
Bryan Duxbury1e987582011-08-25 17:33:03 +0000363 GlobalOutput("TNonblockingServer uncaught exception.");
Mark Sleee02385b2007-06-09 01:21:16 +0000364 }
Mark Slee79b16942007-11-26 19:05:29 +0000365
David Reiss01fe1532010-03-09 05:19:25 +0000366 // Signal completion back to the libevent thread via a pipe
367 if (!connection_->notifyServer()) {
368 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000369 }
David Reiss01fe1532010-03-09 05:19:25 +0000370 }
371
372 TConnection* getTConnection() {
373 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +0000374 }
375
376 private:
377 boost::shared_ptr<TProcessor> processor_;
378 boost::shared_ptr<TProtocol> input_;
379 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000380 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000381 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
382 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000383};
Mark Slee5ea15f92007-03-05 22:55:59 +0000384
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000385void TNonblockingServer::TConnection::init(int socket, short eventFlags,
386 TNonblockingServer* s,
387 const sockaddr* addr,
388 socklen_t addrLen) {
David Reiss105961d2010-10-06 17:10:17 +0000389 tSocket_->setSocketFD(socket);
390 tSocket_->setCachedAddress(addr, addrLen);
391
Mark Slee2f6404d2006-10-10 01:37:40 +0000392 server_ = s;
393 appState_ = APP_INIT;
394 eventFlags_ = 0;
395
396 readBufferPos_ = 0;
397 readWant_ = 0;
398
399 writeBuffer_ = NULL;
400 writeBufferSize_ = 0;
401 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000402 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000403
David Reiss89a12942010-10-06 17:10:52 +0000404 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000405 appState_ = APP_INIT;
David Reiss54bec5d2010-10-06 17:10:45 +0000406 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000407
Mark Slee2f6404d2006-10-10 01:37:40 +0000408 // Set flags, which also registers the event
409 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000410
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000411 // get input/transports
412 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
413 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000414
415 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000416 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
417 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000418
419 // Set up for any server event handler
420 serverEventHandler_ = server_->getEventHandler();
421 if (serverEventHandler_ != NULL) {
422 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
423 } else {
424 connectionContext_ = NULL;
425 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000426
427 // Get the processor
428 processor_ = s->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000429}
430
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000431void TNonblockingServer::TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000432 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000433 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000434
435 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000436 case SOCKET_RECV_FRAMING:
437 union {
438 uint8_t buf[sizeof(uint32_t)];
439 int32_t size;
440 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000441
David Reiss89a12942010-10-06 17:10:52 +0000442 // if we've already received some bytes we kept them here
443 framing.size = readWant_;
444 // determine size of this frame
445 try {
446 // Read from the socket
447 fetch = tSocket_->read(&framing.buf[readBufferPos_],
448 uint32_t(sizeof(framing.size) - readBufferPos_));
449 if (fetch == 0) {
450 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000451 close();
452 return;
453 }
David Reiss89a12942010-10-06 17:10:52 +0000454 readBufferPos_ += fetch;
455 } catch (TTransportException& te) {
456 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
457 close();
458
459 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000460 }
461
David Reiss89a12942010-10-06 17:10:52 +0000462 if (readBufferPos_ < sizeof(framing.size)) {
463 // more needed before frame size is known -- save what we have so far
464 readWant_ = framing.size;
465 return;
466 }
467
468 readWant_ = ntohl(framing.size);
469 if (static_cast<int>(readWant_) <= 0) {
470 GlobalOutput.printf("TConnection:workSocket() Negative frame size %d, remote side not using TFramedTransport?", static_cast<int>(readWant_));
471 close();
472 return;
473 }
474 // size known; now get the rest of the frame
475 transition();
476 return;
477
478 case SOCKET_RECV:
479 // It is an error to be in this state if we already have all the data
480 assert(readBufferPos_ < readWant_);
481
David Reiss105961d2010-10-06 17:10:17 +0000482 try {
483 // Read from the socket
484 fetch = readWant_ - readBufferPos_;
485 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
486 }
487 catch (TTransportException& te) {
488 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
489 close();
Mark Slee79b16942007-11-26 19:05:29 +0000490
David Reiss105961d2010-10-06 17:10:17 +0000491 return;
492 }
493
Mark Slee2f6404d2006-10-10 01:37:40 +0000494 if (got > 0) {
495 // Move along in the buffer
496 readBufferPos_ += got;
497
498 // Check that we did not overdo it
499 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000500
Mark Slee2f6404d2006-10-10 01:37:40 +0000501 // We are done reading, move onto the next state
502 if (readBufferPos_ == readWant_) {
503 transition();
504 }
505 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000506 }
507
508 // Whenever we get down here it means a remote disconnect
509 close();
Mark Slee79b16942007-11-26 19:05:29 +0000510
Mark Slee2f6404d2006-10-10 01:37:40 +0000511 return;
512
513 case SOCKET_SEND:
514 // Should never have position past size
515 assert(writeBufferPos_ <= writeBufferSize_);
516
517 // If there is no data to send, then let us move on
518 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000519 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000520 transition();
521 return;
522 }
523
David Reiss105961d2010-10-06 17:10:17 +0000524 try {
525 left = writeBufferSize_ - writeBufferPos_;
526 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
527 }
528 catch (TTransportException& te) {
529 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000530 close();
531 return;
532 }
533
534 writeBufferPos_ += sent;
535
536 // Did we overdo it?
537 assert(writeBufferPos_ <= writeBufferSize_);
538
Mark Slee79b16942007-11-26 19:05:29 +0000539 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000540 if (writeBufferPos_ == writeBufferSize_) {
541 transition();
542 }
543
544 return;
545
546 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000547 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000548 assert(0);
549 }
550}
551
552/**
553 * This is called when the application transitions from one state into
554 * another. This means that it has finished writing the data that it needed
555 * to, or finished receiving the data that it needed to.
556 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000557void TNonblockingServer::TConnection::transition() {
558
Mark Slee2f6404d2006-10-10 01:37:40 +0000559 // Switch upon the state that we are currently in and move to a new state
560 switch (appState_) {
561
562 case APP_READ_REQUEST:
563 // We are done reading the request, package the read buffer into transport
564 // and get back some data from the dispatch function
565 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000566 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000567 // Prepend four bytes of blank space to the buffer so we can
568 // write the frame size there later.
569 outputTransport_->getWritePtr(4);
570 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000571
David Reiss01fe1532010-03-09 05:19:25 +0000572 server_->incrementActiveProcessors();
573
Mark Sleee02385b2007-06-09 01:21:16 +0000574 if (server_->isThreadPoolProcessing()) {
575 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000576
David Reiss01fe1532010-03-09 05:19:25 +0000577 // Create task and dispatch to the thread manager
578 boost::shared_ptr<Runnable> task =
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000579 boost::shared_ptr<Runnable>(new Task(processor_,
David Reiss01fe1532010-03-09 05:19:25 +0000580 inputProtocol_,
581 outputProtocol_,
582 this));
583 // The application is now waiting on the task to finish
584 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000585
David Reisse11f3072008-10-07 21:39:19 +0000586 try {
587 server_->addTask(task);
588 } catch (IllegalStateException & ise) {
589 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000590 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000591 close();
592 }
Mark Slee402ee282007-08-23 01:43:20 +0000593
David Reiss01fe1532010-03-09 05:19:25 +0000594 // Set this connection idle so that libevent doesn't process more
595 // data on it while we're still waiting for the threadmanager to
596 // finish this task
597 setIdle();
598 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000599 } else {
600 try {
601 // Invoke the processor
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000602 processor_->process(inputProtocol_, outputProtocol_,
603 connectionContext_);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000604 } catch (const TTransportException &ttx) {
605 GlobalOutput.printf("TNonblockingServer transport error in "
606 "process(): %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000607 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000608 close();
609 return;
Bryan Duxbury1e987582011-08-25 17:33:03 +0000610 } catch (const std::exception &x) {
611 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
612 typeid(x).name(), x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000613 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000614 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000615 return;
616 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000617 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000618 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000619 close();
620 return;
621 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000622 }
623
Mark Slee402ee282007-08-23 01:43:20 +0000624 // Intentionally fall through here, the call to process has written into
625 // the writeBuffer_
626
Mark Sleee02385b2007-06-09 01:21:16 +0000627 case APP_WAIT_TASK:
628 // We have now finished processing a task and the result has been written
629 // into the outputTransport_, so we grab its contents and place them into
630 // the writeBuffer_ for actual writing by the libevent thread
631
David Reiss01fe1532010-03-09 05:19:25 +0000632 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000633 // Get the result of the operation
634 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
635
636 // If the function call generated return data, then move into the send
637 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000638 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000639 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000640
641 // Move into write state
642 writeBufferPos_ = 0;
643 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000644
David Reissaf787782008-07-03 20:29:34 +0000645 // Put the frame size into the write buffer
646 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
647 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000648
649 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000650 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000651 setWrite();
652
653 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000654 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000655
656 return;
657 }
658
David Reissc51986f2009-03-24 20:01:25 +0000659 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000660 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000661 goto LABEL_APP_INIT;
662
Mark Slee2f6404d2006-10-10 01:37:40 +0000663 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000664 // it's now safe to perform buffer size housekeeping.
665 if (writeBufferSize_ > largestWriteBufferSize_) {
666 largestWriteBufferSize_ = writeBufferSize_;
667 }
668 if (server_->getResizeBufferEveryN() > 0
669 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
670 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
671 server_->getIdleWriteBufferLimit());
672 callsForResize_ = 0;
673 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000674
675 // N.B.: We also intentionally fall through here into the INIT state!
676
Mark Slee92f00fb2006-10-25 01:28:17 +0000677 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000678 case APP_INIT:
679
680 // Clear write buffer variables
681 writeBuffer_ = NULL;
682 writeBufferPos_ = 0;
683 writeBufferSize_ = 0;
684
Mark Slee2f6404d2006-10-10 01:37:40 +0000685 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000686 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000687 appState_ = APP_READ_FRAME_SIZE;
688
David Reiss89a12942010-10-06 17:10:52 +0000689 readBufferPos_ = 0;
690
Mark Slee2f6404d2006-10-10 01:37:40 +0000691 // Register read event
692 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000693
Mark Slee2f6404d2006-10-10 01:37:40 +0000694 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000695 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000696
697 return;
698
699 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000700 // We just read the request length
701 // Double the buffer size until it is big enough
702 if (readWant_ > readBufferSize_) {
703 if (readBufferSize_ == 0) {
704 readBufferSize_ = 1;
705 }
706 uint32_t newSize = readBufferSize_;
707 while (readWant_ > newSize) {
708 newSize *= 2;
709 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000710
David Reiss89a12942010-10-06 17:10:52 +0000711 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
712 if (newBuffer == NULL) {
713 // nothing else to be done...
714 throw std::bad_alloc();
715 }
716 readBuffer_ = newBuffer;
717 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000718 }
719
Mark Slee2f6404d2006-10-10 01:37:40 +0000720 readBufferPos_= 0;
721
722 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000723 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000724 appState_ = APP_READ_REQUEST;
725
726 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000727 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000728
729 return;
730
David Reiss01fe1532010-03-09 05:19:25 +0000731 case APP_CLOSE_CONNECTION:
732 server_->decrementActiveProcessors();
733 close();
734 return;
735
Mark Slee2f6404d2006-10-10 01:37:40 +0000736 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000737 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000738 assert(0);
739 }
740}
741
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000742void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000743 // Catch the do nothing case
744 if (eventFlags_ == eventFlags) {
745 return;
746 }
747
748 // Delete a previously existing event
749 if (eventFlags_ != 0) {
750 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000751 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000752 return;
753 }
754 }
755
756 // Update in memory structure
757 eventFlags_ = eventFlags;
758
Mark Slee402ee282007-08-23 01:43:20 +0000759 // Do not call event_set if there are no flags
760 if (!eventFlags_) {
761 return;
762 }
763
David Reiss01fe1532010-03-09 05:19:25 +0000764 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000765 * event_set:
766 *
767 * Prepares the event structure &event to be used in future calls to
768 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000769 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000770 *
771 * The events can be either EV_READ, EV_WRITE, or both, indicating
772 * that an application can read or write from the file respectively without
773 * blocking.
774 *
Mark Sleee02385b2007-06-09 01:21:16 +0000775 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000776 * the event and the type of event which will be one of: EV_TIMEOUT,
777 * EV_SIGNAL, EV_READ, EV_WRITE.
778 *
779 * The additional flag EV_PERSIST makes an event_add() persistent until
780 * event_del() has been called.
781 *
782 * Once initialized, the &event struct can be used repeatedly with
783 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000784 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000785 * when an ev structure has been added to libevent using event_add() the
786 * structure must persist until the event occurs (assuming EV_PERSIST
787 * is not set) or is removed using event_del(). You may not reuse the same
788 * ev structure for multiple monitored descriptors; each descriptor needs
789 * its own ev.
790 */
David Reiss105961d2010-10-06 17:10:17 +0000791 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
792 TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000793 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000794
795 // Add the event
796 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000797 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000798 }
799}
800
801/**
802 * Closes a connection
803 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000804void TNonblockingServer::TConnection::close() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000805 // Delete the registered libevent
806 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000807 GlobalOutput.perror("TConnection::close() event_del", errno);
808 }
809
810 if (serverEventHandler_ != NULL) {
811 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000812 }
813
814 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000815 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000816
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000817 // close any factory produced transports
818 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000819 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000820
Mark Slee2f6404d2006-10-10 01:37:40 +0000821 // Give this object back to the server that owns it
822 server_->returnConnection(this);
823}
824
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000825void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
826 size_t readLimit,
827 size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000828 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000829 free(readBuffer_);
830 readBuffer_ = NULL;
831 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000832 }
David Reiss54bec5d2010-10-06 17:10:45 +0000833
834 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
835 // just start over
David Reiss89a12942010-10-06 17:10:52 +0000836 outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
David Reiss54bec5d2010-10-06 17:10:45 +0000837 largestWriteBufferSize_ = 0;
838 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000839}
840
David Reiss8ede8182010-09-02 15:26:28 +0000841TNonblockingServer::~TNonblockingServer() {
842 // TODO: We currently leak any active TConnection objects.
843 // Since we're shutting down and destroying the event_base, the TConnection
844 // objects will never receive any additional callbacks. (And even if they
845 // did, it would be bad, since they keep a pointer around to the server,
846 // which is being destroyed.)
847
848 // Clean up unused TConnection objects in connectionStack_
849 while (!connectionStack_.empty()) {
850 TConnection* connection = connectionStack_.top();
851 connectionStack_.pop();
852 delete connection;
853 }
854
Roger Meierc1905582011-08-02 23:37:36 +0000855 if (eventBase_ && ownEventBase_) {
David Reiss8ede8182010-09-02 15:26:28 +0000856 event_base_free(eventBase_);
857 }
858
859 if (serverSocket_ >= 0) {
860 close(serverSocket_);
861 }
862}
863
Mark Slee2f6404d2006-10-10 01:37:40 +0000864/**
865 * Creates a new connection either by reusing an object off the stack or
866 * by allocating a new one entirely
867 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000868TNonblockingServer::TConnection* TNonblockingServer::createConnection(
869 int socket, short flags,
870 const sockaddr* addr,
871 socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000872 // Check the stack
873 if (connectionStack_.empty()) {
David Reiss105961d2010-10-06 17:10:17 +0000874 return new TConnection(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000875 } else {
876 TConnection* result = connectionStack_.top();
877 connectionStack_.pop();
David Reiss105961d2010-10-06 17:10:17 +0000878 result->init(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000879 return result;
880 }
881}
882
883/**
884 * Returns a connection to the stack
885 */
886void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000887 if (connectionStackLimit_ &&
888 (connectionStack_.size() >= connectionStackLimit_)) {
889 delete connection;
890 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000891 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000892 connectionStack_.push(connection);
893 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000894}
895
896/**
David Reissa79e4882008-03-05 07:51:47 +0000897 * Server socket had something happen. We accept all waiting client
898 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000899 */
900void TNonblockingServer::handleEvent(int fd, short which) {
Roger Meier3b771a12010-11-17 22:11:26 +0000901 (void) which;
David Reiss3bb5e052010-01-25 19:31:31 +0000902 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000903 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000904
Mark Slee2f6404d2006-10-10 01:37:40 +0000905 // Server socket accepted a new connection
906 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000907 sockaddr_storage addrStorage;
908 sockaddr* addrp = (sockaddr*)&addrStorage;
909 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000910
Mark Slee2f6404d2006-10-10 01:37:40 +0000911 // Going to accept a new client socket
912 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000913
Mark Slee2f6404d2006-10-10 01:37:40 +0000914 // Accept as many new clients as possible, even though libevent signaled only
915 // one, this helps us to avoid having to go back into the libevent engine so
916 // many times
David Reiss105961d2010-10-06 17:10:17 +0000917 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000918 // If we're overloaded, take action here
919 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
920 nConnectionsDropped_++;
921 nTotalConnectionsDropped_++;
922 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
923 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000924 return;
David Reiss01fe1532010-03-09 05:19:25 +0000925 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
926 if (!drainPendingTask()) {
927 // Nothing left to discard, so we drop connection instead.
928 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000929 return;
David Reiss01fe1532010-03-09 05:19:25 +0000930 }
931 }
932 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000933 // Explicitly set this socket to NONBLOCK mode
934 int flags;
935 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
936 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000937 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000938 close(clientSocket);
939 return;
940 }
941
942 // Create a new TConnection for this client socket.
943 TConnection* clientConnection =
David Reiss105961d2010-10-06 17:10:17 +0000944 createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000945
946 // Fail fast if we could not create a TConnection object
947 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000948 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000949 close(clientSocket);
950 return;
951 }
952
953 // Put this client connection into the proper state
954 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000955
956 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000957 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000958 }
Mark Slee79b16942007-11-26 19:05:29 +0000959
Mark Slee2f6404d2006-10-10 01:37:40 +0000960 // Done looping accept, now we have to make sure the error is due to
961 // blocking. Any other error is a problem
962 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000963 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000964 }
965}
966
967/**
Mark Slee79b16942007-11-26 19:05:29 +0000968 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000969 */
Mark Slee79b16942007-11-26 19:05:29 +0000970void TNonblockingServer::listenSocket() {
971 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000972 struct addrinfo hints, *res, *res0;
973 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000974
Mark Sleefb4b5142007-11-20 01:27:08 +0000975 char port[sizeof("65536") + 1];
976 memset(&hints, 0, sizeof(hints));
977 hints.ai_family = PF_UNSPEC;
978 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000979 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000980 sprintf(port, "%d", port_);
981
982 // Wildcard address
983 error = getaddrinfo(NULL, port, &hints, &res0);
984 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000985 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
986 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000987 return;
988 }
989
990 // Pick the ipv6 address first since ipv4 addresses can be mapped
991 // into ipv6 space.
992 for (res = res0; res; res = res->ai_next) {
993 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
994 break;
995 }
996
Mark Slee2f6404d2006-10-10 01:37:40 +0000997 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000998 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
999 if (s == -1) {
1000 freeaddrinfo(res0);
1001 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +00001002 }
1003
David Reiss13aea462008-06-10 22:56:04 +00001004 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +00001005 if (res->ai_family == AF_INET6) {
1006 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +00001007 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +00001008 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
1009 }
David Reiss13aea462008-06-10 22:56:04 +00001010 }
1011 #endif // #ifdef IPV6_V6ONLY
1012
1013
Mark Slee79b16942007-11-26 19:05:29 +00001014 int one = 1;
1015
1016 // Set reuseaddr to avoid 2MSL delay on server restart
Roger Meier30aae0c2011-07-08 12:23:31 +00001017 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +00001018
Roger Meier30aae0c2011-07-08 12:23:31 +00001019 if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
Mark Slee79b16942007-11-26 19:05:29 +00001020 close(s);
1021 freeaddrinfo(res0);
1022 throw TException("TNonblockingServer::serve() bind");
1023 }
1024
1025 // Done with the addr info
1026 freeaddrinfo(res0);
1027
1028 // Set up this file descriptor for listening
1029 listenSocket(s);
1030}
1031
1032/**
1033 * Takes a socket created by listenSocket() and sets various options on it
1034 * to prepare for use in the server.
1035 */
1036void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +00001037 // Set socket to nonblocking mode
1038 int flags;
Mark Slee79b16942007-11-26 19:05:29 +00001039 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
1040 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
1041 close(s);
1042 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +00001043 }
1044
1045 int one = 1;
1046 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +00001047
1048 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +00001049 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001050
1051 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +00001052 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +00001053
1054 // Set TCP nodelay if available, MAC OS X Hack
1055 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
1056 #ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +00001057 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001058 #endif
1059
David Reiss1c20c872010-03-09 05:20:14 +00001060 #ifdef TCP_LOW_MIN_RTO
1061 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +00001062 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +00001063 }
1064 #endif
1065
Mark Slee79b16942007-11-26 19:05:29 +00001066 if (listen(s, LISTEN_BACKLOG) == -1) {
1067 close(s);
1068 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +00001069 }
1070
Mark Slee79b16942007-11-26 19:05:29 +00001071 // Cool, this socket is good to go, set it as the serverSocket_
1072 serverSocket_ = s;
1073}
1074
David Reiss01fe1532010-03-09 05:19:25 +00001075void TNonblockingServer::createNotificationPipe() {
Roger Meier30aae0c2011-07-08 12:23:31 +00001076 if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
1077 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
1078 throw TException("can't create notification pipe");
David Reiss01fe1532010-03-09 05:19:25 +00001079 }
Roger Meier30aae0c2011-07-08 12:23:31 +00001080 if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
1081 evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
David Reiss83b8fda2010-03-09 05:19:34 +00001082 close(notificationPipeFDs_[0]);
1083 close(notificationPipeFDs_[1]);
1084 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
1085 }
David Reiss01fe1532010-03-09 05:19:25 +00001086}
1087
Mark Slee79b16942007-11-26 19:05:29 +00001088/**
1089 * Register the core libevent events onto the proper base.
1090 */
Roger Meierc1905582011-08-02 23:37:36 +00001091void TNonblockingServer::registerEvents(event_base* base, bool ownEventBase) {
Mark Slee79b16942007-11-26 19:05:29 +00001092 assert(serverSocket_ != -1);
1093 assert(!eventBase_);
1094 eventBase_ = base;
Roger Meierc1905582011-08-02 23:37:36 +00001095 ownEventBase_ = ownEventBase;
Mark Slee79b16942007-11-26 19:05:29 +00001096
1097 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +00001098 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +00001099 event_get_version(),
Bryan Duxbury37874ca2011-08-25 17:28:23 +00001100 event_base_get_method(eventBase_));
Mark Slee2f6404d2006-10-10 01:37:40 +00001101
1102 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +00001103 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +00001104 serverSocket_,
1105 EV_READ | EV_PERSIST,
1106 TNonblockingServer::eventHandler,
1107 this);
Mark Slee79b16942007-11-26 19:05:29 +00001108 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +00001109
1110 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +00001111 if (-1 == event_add(&serverEvent_, 0)) {
1112 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +00001113 }
David Reiss01fe1532010-03-09 05:19:25 +00001114 if (threadPoolProcessing_) {
1115 // Create an event to be notified when a task finishes
1116 event_set(&notificationEvent_,
1117 getNotificationRecvFD(),
1118 EV_READ | EV_PERSIST,
1119 TConnection::taskHandler,
1120 this);
David Reiss1c20c872010-03-09 05:20:14 +00001121
David Reiss01fe1532010-03-09 05:19:25 +00001122 // Attach to the base
1123 event_base_set(eventBase_, &notificationEvent_);
1124
1125 // Add the event and start up the server
1126 if (-1 == event_add(&notificationEvent_, 0)) {
1127 throw TException("TNonblockingServer::serve(): notification event_add fail");
1128 }
1129 }
1130}
1131
David Reiss068f4162010-03-09 05:19:45 +00001132void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
1133 threadManager_ = threadManager;
1134 if (threadManager != NULL) {
1135 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
1136 threadPoolProcessing_ = true;
1137 } else {
1138 threadPoolProcessing_ = false;
1139 }
1140}
1141
David Reiss01fe1532010-03-09 05:19:25 +00001142bool TNonblockingServer::serverOverloaded() {
1143 size_t activeConnections = numTConnections_ - connectionStack_.size();
1144 if (numActiveProcessors_ > maxActiveProcessors_ ||
1145 activeConnections > maxConnections_) {
1146 if (!overloaded_) {
1147 GlobalOutput.printf("thrift non-blocking server overload condition");
1148 overloaded_ = true;
1149 }
1150 } else {
1151 if (overloaded_ &&
1152 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
1153 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
1154 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
1155 nConnectionsDropped_, nTotalConnectionsDropped_);
1156 nConnectionsDropped_ = 0;
1157 overloaded_ = false;
1158 }
1159 }
1160
1161 return overloaded_;
1162}
1163
1164bool TNonblockingServer::drainPendingTask() {
1165 if (threadManager_) {
1166 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1167 if (task) {
1168 TConnection* connection =
1169 static_cast<TConnection::Task*>(task.get())->getTConnection();
1170 assert(connection && connection->getServer()
1171 && connection->getState() == APP_WAIT_TASK);
1172 connection->forceClose();
1173 return true;
1174 }
1175 }
1176 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001177}
1178
David Reiss068f4162010-03-09 05:19:45 +00001179void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
1180 TConnection* connection =
1181 static_cast<TConnection::Task*>(task.get())->getTConnection();
1182 assert(connection && connection->getServer()
1183 && connection->getState() == APP_WAIT_TASK);
1184 connection->forceClose();
1185}
1186
Mark Slee79b16942007-11-26 19:05:29 +00001187/**
1188 * Main workhorse function, starts up the server listening on a port and
1189 * loops over the libevent handler.
1190 */
1191void TNonblockingServer::serve() {
1192 // Init socket
1193 listenSocket();
1194
David Reiss01fe1532010-03-09 05:19:25 +00001195 if (threadPoolProcessing_) {
1196 // Init task completion notification pipe
1197 createNotificationPipe();
1198 }
1199
Mark Slee79b16942007-11-26 19:05:29 +00001200 // Initialize libevent core
Bryan Duxbury37874ca2011-08-25 17:28:23 +00001201 registerEvents(static_cast<event_base*>(event_base_new()), true);
Mark Slee2f6404d2006-10-10 01:37:40 +00001202
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001203 // Run the preServe event
1204 if (eventHandler_ != NULL) {
1205 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001206 }
1207
Bryan Duxbury76c43682011-08-24 21:26:48 +00001208 // Run libevent engine, invokes calls to eventHandler
1209 // Only returns if stop() is called.
Mark Slee79b16942007-11-26 19:05:29 +00001210 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +00001211}
1212
Bryan Duxbury76c43682011-08-24 21:26:48 +00001213void TNonblockingServer::stop() {
1214 if (!eventBase_) {
1215 return;
1216 }
1217
1218 // Call event_base_loopbreak() to tell libevent to exit the loop
1219 //
1220 // (The libevent documentation doesn't explicitly state that this function is
1221 // safe to call from another thread. However, all it does is set a variable,
1222 // in the event_base, so it should be fine.)
1223 event_base_loopbreak(eventBase_);
1224
1225 // event_base_loopbreak() only causes the loop to exit the next time it wakes
1226 // up. We need to force it to wake up, in case there are no real events
1227 // it needs to process.
1228 //
1229 // Attempt to connect to the server socket. If anything fails,
1230 // we'll just have to wait until libevent wakes up on its own.
1231 //
1232 // First create a socket
1233 int fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
1234 if (fd < 0) {
1235 return;
1236 }
1237
1238 // Set up the address
1239 struct sockaddr_in addr;
1240 addr.sin_family = AF_INET;
1241 addr.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1
1242 addr.sin_port = htons(port_);
1243
1244 // Finally do the connect().
1245 // We don't care about the return value;
1246 // we're just going to close the socket either way.
1247 connect(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
1248 close(fd);
1249}
1250
T Jake Lucianib5e62212009-01-31 22:36:20 +00001251}}} // apache::thrift::server