blob: c331edabd832a8383337ae7cda5e7839cffb1f57 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meier2fa9c312011-09-05 19:15:53 +000020#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23
Mark Slee2f6404d2006-10-10 01:37:40 +000024#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000025#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000026#include <transport/TSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000027
Mark Sleee02385b2007-06-09 01:21:16 +000028#include <iostream>
Roger Meier30aae0c2011-07-08 12:23:31 +000029
30#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000031#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000032#endif
33
34#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000035#include <netinet/in.h>
36#include <netinet/tcp.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000037#endif
38
39#ifdef HAVE_ARPA_INET_H
Bryan Duxbury76c43682011-08-24 21:26:48 +000040#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000041#endif
42
43#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000044#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000045#endif
46
Roger Meier2fa9c312011-09-05 19:15:53 +000047#ifdef HAVE_FCNTL_H
Mark Slee2f6404d2006-10-10 01:37:40 +000048#include <fcntl.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000049#endif
50
Mark Slee2f6404d2006-10-10 01:37:40 +000051#include <errno.h>
52#include <assert.h>
53
David Reiss9b903442009-10-21 05:51:28 +000054#ifndef AF_LOCAL
55#define AF_LOCAL AF_UNIX
56#endif
57
T Jake Lucianib5e62212009-01-31 22:36:20 +000058namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000059
T Jake Lucianib5e62212009-01-31 22:36:20 +000060using namespace apache::thrift::protocol;
61using namespace apache::thrift::transport;
62using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000063using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000064using apache::thrift::transport::TSocket;
65using apache::thrift::transport::TTransportException;
Mark Sleee02385b2007-06-09 01:21:16 +000066
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000067/// Three states for sockets: recv frame size, recv data, and send mode
68enum TSocketState {
69 SOCKET_RECV_FRAMING,
70 SOCKET_RECV,
71 SOCKET_SEND
72};
73
74/**
75 * Five states for the nonblocking server:
76 * 1) initialize
77 * 2) read 4 byte frame size
78 * 3) read frame of data
79 * 4) send back data (if any)
80 * 5) force immediate connection close
81 */
82enum TAppState {
83 APP_INIT,
84 APP_READ_FRAME_SIZE,
85 APP_READ_REQUEST,
86 APP_WAIT_TASK,
87 APP_SEND_RESULT,
88 APP_CLOSE_CONNECTION
89};
90
91/**
92 * Represents a connection that is handled via libevent. This connection
93 * essentially encapsulates a socket that has some associated libevent state.
94 */
95class TNonblockingServer::TConnection {
96 private:
97
98 /// Server handle
99 TNonblockingServer* server_;
100
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000101 /// TProcessor
102 boost::shared_ptr<TProcessor> processor_;
103
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000104 /// Object wrapping network socket
105 boost::shared_ptr<TSocket> tSocket_;
106
107 /// Libevent object
108 struct event event_;
109
110 /// Libevent flags
111 short eventFlags_;
112
113 /// Socket mode
114 TSocketState socketState_;
115
116 /// Application state
117 TAppState appState_;
118
119 /// How much data needed to read
120 uint32_t readWant_;
121
122 /// Where in the read buffer are we
123 uint32_t readBufferPos_;
124
125 /// Read buffer
126 uint8_t* readBuffer_;
127
128 /// Read buffer size
129 uint32_t readBufferSize_;
130
131 /// Write buffer
132 uint8_t* writeBuffer_;
133
134 /// Write buffer size
135 uint32_t writeBufferSize_;
136
137 /// How far through writing are we?
138 uint32_t writeBufferPos_;
139
140 /// Largest size of write buffer seen since buffer was constructed
141 size_t largestWriteBufferSize_;
142
143 /// Count of the number of calls for use with getResizeBufferEveryN().
144 int32_t callsForResize_;
145
146 /// Task handle
147 int taskHandle_;
148
149 /// Task event
150 struct event taskEvent_;
151
152 /// Transport to read from
153 boost::shared_ptr<TMemoryBuffer> inputTransport_;
154
155 /// Transport that processor writes to
156 boost::shared_ptr<TMemoryBuffer> outputTransport_;
157
158 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
159 boost::shared_ptr<TTransport> factoryInputTransport_;
160 boost::shared_ptr<TTransport> factoryOutputTransport_;
161
162 /// Protocol decoder
163 boost::shared_ptr<TProtocol> inputProtocol_;
164
165 /// Protocol encoder
166 boost::shared_ptr<TProtocol> outputProtocol_;
167
168 /// Server event handler, if any
169 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
170
171 /// Thrift call context, if any
172 void *connectionContext_;
173
174 /// Go into read mode
175 void setRead() {
176 setFlags(EV_READ | EV_PERSIST);
177 }
178
179 /// Go into write mode
180 void setWrite() {
181 setFlags(EV_WRITE | EV_PERSIST);
182 }
183
184 /// Set socket idle
185 void setIdle() {
186 setFlags(0);
187 }
188
189 /**
190 * Set event flags for this connection.
191 *
192 * @param eventFlags flags we pass to libevent for the connection.
193 */
194 void setFlags(short eventFlags);
195
196 /**
197 * Libevent handler called (via our static wrapper) when the connection
198 * socket had something happen. Rather than use the flags libevent passed,
199 * we use the connection state to determine whether we need to read or
200 * write the socket.
201 */
202 void workSocket();
203
204 /// Close this connection and free or reset its resources.
205 void close();
206
207 public:
208
209 class Task;
210
211 /// Constructor
212 TConnection(int socket, short eventFlags, TNonblockingServer *s,
213 const sockaddr* addr, socklen_t addrLen) {
214 readBuffer_ = NULL;
215 readBufferSize_ = 0;
216
217 // Allocate input and output transports
218 // these only need to be allocated once per TConnection (they don't need to be
219 // reallocated on init() call)
220 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
221 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
222 tSocket_.reset(new TSocket());
223
224 init(socket, eventFlags, s, addr, addrLen);
225 server_->incrementNumConnections();
226 }
227
228 ~TConnection() {
229 std::free(readBuffer_);
230 server_->decrementNumConnections();
231 }
232
233 /**
234 * Check buffers against any size limits and shrink it if exceeded.
235 *
236 * @param readLimit we reduce read buffer size to this (if nonzero).
237 * @param writeLimit if nonzero and write buffer is larger, replace it.
238 */
239 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
240
241 /// Initialize
242 void init(int socket, short eventFlags, TNonblockingServer *s,
243 const sockaddr* addr, socklen_t addrLen);
244
245 /**
246 * This is called when the application transitions from one state into
247 * another. This means that it has finished writing the data that it needed
248 * to, or finished receiving the data that it needed to.
249 */
250 void transition();
251
252 /**
253 * C-callable event handler for connection events. Provides a callback
254 * that libevent can understand which invokes connection_->workSocket().
255 *
256 * @param fd the descriptor the event occurred on.
257 * @param which the flags associated with the event.
258 * @param v void* callback arg where we placed TConnection's "this".
259 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000260 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000261 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
262 ((TConnection*)v)->workSocket();
263 }
264
265 /**
266 * C-callable event handler for signaling task completion. Provides a
267 * callback that libevent can understand that will read a connection
268 * object's address from a pipe and call connection->transition() for
269 * that object.
270 *
271 * @param fd the descriptor the event occurred on.
272 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000273 static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000274 TConnection* connection;
275 ssize_t nBytes;
Bryan Duxbury266b1732011-09-01 16:50:28 +0000276 while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000277 == sizeof(TConnection*)) {
278 connection->transition();
279 }
280 if (nBytes > 0) {
281 throw TException("TConnection::taskHandler unexpected partial read");
282 }
283 if (errno != EWOULDBLOCK && errno != EAGAIN) {
284 GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
285 }
286 }
287
288 /**
289 * Notification to server that processing has ended on this request.
290 * Can be called either when processing is completed or when a waiting
291 * task has been preemptively terminated (on overload).
292 *
293 * @return true if successful, false if unable to notify (check errno).
294 */
295 bool notifyServer() {
296 TConnection* connection = this;
Bryan Duxbury266b1732011-09-01 16:50:28 +0000297 if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
298 sizeof(TConnection*), 0) != sizeof(TConnection*)) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000299 return false;
300 }
301
302 return true;
303 }
304
305 /// Force connection shutdown for this connection.
306 void forceClose() {
307 appState_ = APP_CLOSE_CONNECTION;
308 if (!notifyServer()) {
309 throw TException("TConnection::forceClose: failed write on notify pipe");
310 }
311 }
312
313 /// return the server this connection was initialized for.
314 TNonblockingServer* getServer() {
315 return server_;
316 }
317
318 /// get state of connection.
319 TAppState getState() {
320 return appState_;
321 }
322
323 /// return the TSocket transport wrapping this network connection
324 boost::shared_ptr<TSocket> getTSocket() const {
325 return tSocket_;
326 }
327
328 /// return the server event handler if any
329 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
330 return serverEventHandler_;
331 }
332
333 /// return the Thrift connection context if any
334 void* getConnectionContext() {
335 return connectionContext_;
336 }
337
338};
339
340class TNonblockingServer::TConnection::Task: public Runnable {
Mark Sleee02385b2007-06-09 01:21:16 +0000341 public:
342 Task(boost::shared_ptr<TProcessor> processor,
343 boost::shared_ptr<TProtocol> input,
344 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +0000345 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +0000346 processor_(processor),
347 input_(input),
348 output_(output),
David Reiss105961d2010-10-06 17:10:17 +0000349 connection_(connection),
350 serverEventHandler_(connection_->getServerEventHandler()),
351 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000352
353 void run() {
354 try {
David Reiss105961d2010-10-06 17:10:17 +0000355 for (;;) {
356 if (serverEventHandler_ != NULL) {
357 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
358 }
359 if (!processor_->process(input_, output_, connectionContext_) ||
360 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000361 break;
362 }
363 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000364 } catch (const TTransportException& ttx) {
365 GlobalOutput.printf("TNonblockingServer client died: %s", ttx.what());
366 } catch (const bad_alloc&) {
367 GlobalOutput("TNonblockingServer caught bad_alloc exception.");
David Reiss28e88ec2010-03-09 05:19:27 +0000368 exit(-1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000369 } catch (const std::exception& x) {
370 GlobalOutput.printf("TNonblockingServer process() exception: %s: %s",
371 typeid(x).name(), x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000372 } catch (...) {
Bryan Duxbury1e987582011-08-25 17:33:03 +0000373 GlobalOutput("TNonblockingServer uncaught exception.");
Mark Sleee02385b2007-06-09 01:21:16 +0000374 }
Mark Slee79b16942007-11-26 19:05:29 +0000375
David Reiss01fe1532010-03-09 05:19:25 +0000376 // Signal completion back to the libevent thread via a pipe
377 if (!connection_->notifyServer()) {
378 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000379 }
David Reiss01fe1532010-03-09 05:19:25 +0000380 }
381
382 TConnection* getTConnection() {
383 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +0000384 }
385
386 private:
387 boost::shared_ptr<TProcessor> processor_;
388 boost::shared_ptr<TProtocol> input_;
389 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000390 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000391 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
392 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000393};
Mark Slee5ea15f92007-03-05 22:55:59 +0000394
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000395void TNonblockingServer::TConnection::init(int socket, short eventFlags,
396 TNonblockingServer* s,
397 const sockaddr* addr,
398 socklen_t addrLen) {
David Reiss105961d2010-10-06 17:10:17 +0000399 tSocket_->setSocketFD(socket);
400 tSocket_->setCachedAddress(addr, addrLen);
401
Mark Slee2f6404d2006-10-10 01:37:40 +0000402 server_ = s;
403 appState_ = APP_INIT;
404 eventFlags_ = 0;
405
406 readBufferPos_ = 0;
407 readWant_ = 0;
408
409 writeBuffer_ = NULL;
410 writeBufferSize_ = 0;
411 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000412 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000413
David Reiss89a12942010-10-06 17:10:52 +0000414 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000415 appState_ = APP_INIT;
David Reiss54bec5d2010-10-06 17:10:45 +0000416 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000417
Mark Slee2f6404d2006-10-10 01:37:40 +0000418 // Set flags, which also registers the event
419 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000420
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000421 // get input/transports
422 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
423 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000424
425 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000426 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
427 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000428
429 // Set up for any server event handler
430 serverEventHandler_ = server_->getEventHandler();
431 if (serverEventHandler_ != NULL) {
432 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
433 } else {
434 connectionContext_ = NULL;
435 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000436
437 // Get the processor
438 processor_ = s->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000439}
440
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000441void TNonblockingServer::TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000442 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000443 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000444
445 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000446 case SOCKET_RECV_FRAMING:
447 union {
448 uint8_t buf[sizeof(uint32_t)];
449 int32_t size;
450 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000451
David Reiss89a12942010-10-06 17:10:52 +0000452 // if we've already received some bytes we kept them here
453 framing.size = readWant_;
454 // determine size of this frame
455 try {
456 // Read from the socket
457 fetch = tSocket_->read(&framing.buf[readBufferPos_],
458 uint32_t(sizeof(framing.size) - readBufferPos_));
459 if (fetch == 0) {
460 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000461 close();
462 return;
463 }
David Reiss89a12942010-10-06 17:10:52 +0000464 readBufferPos_ += fetch;
465 } catch (TTransportException& te) {
466 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
467 close();
468
469 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000470 }
471
David Reiss89a12942010-10-06 17:10:52 +0000472 if (readBufferPos_ < sizeof(framing.size)) {
473 // more needed before frame size is known -- save what we have so far
474 readWant_ = framing.size;
475 return;
476 }
477
478 readWant_ = ntohl(framing.size);
479 if (static_cast<int>(readWant_) <= 0) {
480 GlobalOutput.printf("TConnection:workSocket() Negative frame size %d, remote side not using TFramedTransport?", static_cast<int>(readWant_));
481 close();
482 return;
483 }
484 // size known; now get the rest of the frame
485 transition();
486 return;
487
488 case SOCKET_RECV:
489 // It is an error to be in this state if we already have all the data
490 assert(readBufferPos_ < readWant_);
491
David Reiss105961d2010-10-06 17:10:17 +0000492 try {
493 // Read from the socket
494 fetch = readWant_ - readBufferPos_;
495 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
496 }
497 catch (TTransportException& te) {
498 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
499 close();
Mark Slee79b16942007-11-26 19:05:29 +0000500
David Reiss105961d2010-10-06 17:10:17 +0000501 return;
502 }
503
Mark Slee2f6404d2006-10-10 01:37:40 +0000504 if (got > 0) {
505 // Move along in the buffer
506 readBufferPos_ += got;
507
508 // Check that we did not overdo it
509 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000510
Mark Slee2f6404d2006-10-10 01:37:40 +0000511 // We are done reading, move onto the next state
512 if (readBufferPos_ == readWant_) {
513 transition();
514 }
515 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000516 }
517
518 // Whenever we get down here it means a remote disconnect
519 close();
Mark Slee79b16942007-11-26 19:05:29 +0000520
Mark Slee2f6404d2006-10-10 01:37:40 +0000521 return;
522
523 case SOCKET_SEND:
524 // Should never have position past size
525 assert(writeBufferPos_ <= writeBufferSize_);
526
527 // If there is no data to send, then let us move on
528 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000529 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000530 transition();
531 return;
532 }
533
David Reiss105961d2010-10-06 17:10:17 +0000534 try {
535 left = writeBufferSize_ - writeBufferPos_;
536 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
537 }
538 catch (TTransportException& te) {
539 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000540 close();
541 return;
542 }
543
544 writeBufferPos_ += sent;
545
546 // Did we overdo it?
547 assert(writeBufferPos_ <= writeBufferSize_);
548
Mark Slee79b16942007-11-26 19:05:29 +0000549 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000550 if (writeBufferPos_ == writeBufferSize_) {
551 transition();
552 }
553
554 return;
555
556 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000557 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000558 assert(0);
559 }
560}
561
562/**
563 * This is called when the application transitions from one state into
564 * another. This means that it has finished writing the data that it needed
565 * to, or finished receiving the data that it needed to.
566 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000567void TNonblockingServer::TConnection::transition() {
568
Mark Slee2f6404d2006-10-10 01:37:40 +0000569 // Switch upon the state that we are currently in and move to a new state
570 switch (appState_) {
571
572 case APP_READ_REQUEST:
573 // We are done reading the request, package the read buffer into transport
574 // and get back some data from the dispatch function
575 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000576 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000577 // Prepend four bytes of blank space to the buffer so we can
578 // write the frame size there later.
579 outputTransport_->getWritePtr(4);
580 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000581
David Reiss01fe1532010-03-09 05:19:25 +0000582 server_->incrementActiveProcessors();
583
Mark Sleee02385b2007-06-09 01:21:16 +0000584 if (server_->isThreadPoolProcessing()) {
585 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000586
David Reiss01fe1532010-03-09 05:19:25 +0000587 // Create task and dispatch to the thread manager
588 boost::shared_ptr<Runnable> task =
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000589 boost::shared_ptr<Runnable>(new Task(processor_,
David Reiss01fe1532010-03-09 05:19:25 +0000590 inputProtocol_,
591 outputProtocol_,
592 this));
593 // The application is now waiting on the task to finish
594 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000595
David Reisse11f3072008-10-07 21:39:19 +0000596 try {
597 server_->addTask(task);
598 } catch (IllegalStateException & ise) {
599 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000600 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000601 close();
602 }
Mark Slee402ee282007-08-23 01:43:20 +0000603
David Reiss01fe1532010-03-09 05:19:25 +0000604 // Set this connection idle so that libevent doesn't process more
605 // data on it while we're still waiting for the threadmanager to
606 // finish this task
607 setIdle();
608 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000609 } else {
610 try {
611 // Invoke the processor
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000612 processor_->process(inputProtocol_, outputProtocol_,
613 connectionContext_);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000614 } catch (const TTransportException &ttx) {
615 GlobalOutput.printf("TNonblockingServer transport error in "
616 "process(): %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000617 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000618 close();
619 return;
Bryan Duxbury1e987582011-08-25 17:33:03 +0000620 } catch (const std::exception &x) {
621 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
622 typeid(x).name(), x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000623 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000624 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000625 return;
626 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000627 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000628 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000629 close();
630 return;
631 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000632 }
633
Mark Slee402ee282007-08-23 01:43:20 +0000634 // Intentionally fall through here, the call to process has written into
635 // the writeBuffer_
636
Mark Sleee02385b2007-06-09 01:21:16 +0000637 case APP_WAIT_TASK:
638 // We have now finished processing a task and the result has been written
639 // into the outputTransport_, so we grab its contents and place them into
640 // the writeBuffer_ for actual writing by the libevent thread
641
David Reiss01fe1532010-03-09 05:19:25 +0000642 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000643 // Get the result of the operation
644 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
645
646 // If the function call generated return data, then move into the send
647 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000648 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000649 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000650
651 // Move into write state
652 writeBufferPos_ = 0;
653 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000654
David Reissaf787782008-07-03 20:29:34 +0000655 // Put the frame size into the write buffer
656 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
657 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000658
659 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000660 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000661 setWrite();
662
663 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000664 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000665
666 return;
667 }
668
David Reissc51986f2009-03-24 20:01:25 +0000669 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000670 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000671 goto LABEL_APP_INIT;
672
Mark Slee2f6404d2006-10-10 01:37:40 +0000673 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000674 // it's now safe to perform buffer size housekeeping.
675 if (writeBufferSize_ > largestWriteBufferSize_) {
676 largestWriteBufferSize_ = writeBufferSize_;
677 }
678 if (server_->getResizeBufferEveryN() > 0
679 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
680 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
681 server_->getIdleWriteBufferLimit());
682 callsForResize_ = 0;
683 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000684
685 // N.B.: We also intentionally fall through here into the INIT state!
686
Mark Slee92f00fb2006-10-25 01:28:17 +0000687 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000688 case APP_INIT:
689
690 // Clear write buffer variables
691 writeBuffer_ = NULL;
692 writeBufferPos_ = 0;
693 writeBufferSize_ = 0;
694
Mark Slee2f6404d2006-10-10 01:37:40 +0000695 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000696 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000697 appState_ = APP_READ_FRAME_SIZE;
698
David Reiss89a12942010-10-06 17:10:52 +0000699 readBufferPos_ = 0;
700
Mark Slee2f6404d2006-10-10 01:37:40 +0000701 // Register read event
702 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000703
Mark Slee2f6404d2006-10-10 01:37:40 +0000704 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000705 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000706
707 return;
708
709 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000710 // We just read the request length
711 // Double the buffer size until it is big enough
712 if (readWant_ > readBufferSize_) {
713 if (readBufferSize_ == 0) {
714 readBufferSize_ = 1;
715 }
716 uint32_t newSize = readBufferSize_;
717 while (readWant_ > newSize) {
718 newSize *= 2;
719 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000720
David Reiss89a12942010-10-06 17:10:52 +0000721 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
722 if (newBuffer == NULL) {
723 // nothing else to be done...
724 throw std::bad_alloc();
725 }
726 readBuffer_ = newBuffer;
727 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000728 }
729
Mark Slee2f6404d2006-10-10 01:37:40 +0000730 readBufferPos_= 0;
731
732 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000733 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000734 appState_ = APP_READ_REQUEST;
735
736 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000737 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000738
739 return;
740
David Reiss01fe1532010-03-09 05:19:25 +0000741 case APP_CLOSE_CONNECTION:
742 server_->decrementActiveProcessors();
743 close();
744 return;
745
Mark Slee2f6404d2006-10-10 01:37:40 +0000746 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000747 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000748 assert(0);
749 }
750}
751
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000752void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000753 // Catch the do nothing case
754 if (eventFlags_ == eventFlags) {
755 return;
756 }
757
758 // Delete a previously existing event
759 if (eventFlags_ != 0) {
760 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000761 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000762 return;
763 }
764 }
765
766 // Update in memory structure
767 eventFlags_ = eventFlags;
768
Mark Slee402ee282007-08-23 01:43:20 +0000769 // Do not call event_set if there are no flags
770 if (!eventFlags_) {
771 return;
772 }
773
David Reiss01fe1532010-03-09 05:19:25 +0000774 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000775 * event_set:
776 *
777 * Prepares the event structure &event to be used in future calls to
778 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000779 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000780 *
781 * The events can be either EV_READ, EV_WRITE, or both, indicating
782 * that an application can read or write from the file respectively without
783 * blocking.
784 *
Mark Sleee02385b2007-06-09 01:21:16 +0000785 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000786 * the event and the type of event which will be one of: EV_TIMEOUT,
787 * EV_SIGNAL, EV_READ, EV_WRITE.
788 *
789 * The additional flag EV_PERSIST makes an event_add() persistent until
790 * event_del() has been called.
791 *
792 * Once initialized, the &event struct can be used repeatedly with
793 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000794 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000795 * when an ev structure has been added to libevent using event_add() the
796 * structure must persist until the event occurs (assuming EV_PERSIST
797 * is not set) or is removed using event_del(). You may not reuse the same
798 * ev structure for multiple monitored descriptors; each descriptor needs
799 * its own ev.
800 */
David Reiss105961d2010-10-06 17:10:17 +0000801 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
802 TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000803 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000804
805 // Add the event
806 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000807 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000808 }
809}
810
811/**
812 * Closes a connection
813 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000814void TNonblockingServer::TConnection::close() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000815 // Delete the registered libevent
816 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000817 GlobalOutput.perror("TConnection::close() event_del", errno);
818 }
819
820 if (serverEventHandler_ != NULL) {
821 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000822 }
823
824 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000825 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000826
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000827 // close any factory produced transports
828 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000829 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000830
Mark Slee2f6404d2006-10-10 01:37:40 +0000831 // Give this object back to the server that owns it
832 server_->returnConnection(this);
833}
834
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000835void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
836 size_t readLimit,
837 size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000838 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000839 free(readBuffer_);
840 readBuffer_ = NULL;
841 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000842 }
David Reiss54bec5d2010-10-06 17:10:45 +0000843
844 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
845 // just start over
David Reiss89a12942010-10-06 17:10:52 +0000846 outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
David Reiss54bec5d2010-10-06 17:10:45 +0000847 largestWriteBufferSize_ = 0;
848 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000849}
850
David Reiss8ede8182010-09-02 15:26:28 +0000851TNonblockingServer::~TNonblockingServer() {
852 // TODO: We currently leak any active TConnection objects.
853 // Since we're shutting down and destroying the event_base, the TConnection
854 // objects will never receive any additional callbacks. (And even if they
855 // did, it would be bad, since they keep a pointer around to the server,
856 // which is being destroyed.)
857
858 // Clean up unused TConnection objects in connectionStack_
859 while (!connectionStack_.empty()) {
860 TConnection* connection = connectionStack_.top();
861 connectionStack_.pop();
862 delete connection;
863 }
864
Roger Meierc1905582011-08-02 23:37:36 +0000865 if (eventBase_ && ownEventBase_) {
David Reiss8ede8182010-09-02 15:26:28 +0000866 event_base_free(eventBase_);
867 }
868
869 if (serverSocket_ >= 0) {
870 close(serverSocket_);
871 }
872}
873
Mark Slee2f6404d2006-10-10 01:37:40 +0000874/**
875 * Creates a new connection either by reusing an object off the stack or
876 * by allocating a new one entirely
877 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000878TNonblockingServer::TConnection* TNonblockingServer::createConnection(
879 int socket, short flags,
880 const sockaddr* addr,
881 socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000882 // Check the stack
883 if (connectionStack_.empty()) {
David Reiss105961d2010-10-06 17:10:17 +0000884 return new TConnection(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000885 } else {
886 TConnection* result = connectionStack_.top();
887 connectionStack_.pop();
David Reiss105961d2010-10-06 17:10:17 +0000888 result->init(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000889 return result;
890 }
891}
892
893/**
894 * Returns a connection to the stack
895 */
896void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000897 if (connectionStackLimit_ &&
898 (connectionStack_.size() >= connectionStackLimit_)) {
899 delete connection;
900 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000901 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000902 connectionStack_.push(connection);
903 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000904}
905
906/**
David Reissa79e4882008-03-05 07:51:47 +0000907 * Server socket had something happen. We accept all waiting client
908 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000909 */
910void TNonblockingServer::handleEvent(int fd, short which) {
Roger Meier3b771a12010-11-17 22:11:26 +0000911 (void) which;
David Reiss3bb5e052010-01-25 19:31:31 +0000912 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000913 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000914
Mark Slee2f6404d2006-10-10 01:37:40 +0000915 // Server socket accepted a new connection
916 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000917 sockaddr_storage addrStorage;
918 sockaddr* addrp = (sockaddr*)&addrStorage;
919 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000920
Mark Slee2f6404d2006-10-10 01:37:40 +0000921 // Going to accept a new client socket
922 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000923
Mark Slee2f6404d2006-10-10 01:37:40 +0000924 // Accept as many new clients as possible, even though libevent signaled only
925 // one, this helps us to avoid having to go back into the libevent engine so
926 // many times
David Reiss105961d2010-10-06 17:10:17 +0000927 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000928 // If we're overloaded, take action here
929 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
930 nConnectionsDropped_++;
931 nTotalConnectionsDropped_++;
932 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
933 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000934 return;
David Reiss01fe1532010-03-09 05:19:25 +0000935 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
936 if (!drainPendingTask()) {
937 // Nothing left to discard, so we drop connection instead.
938 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000939 return;
David Reiss01fe1532010-03-09 05:19:25 +0000940 }
941 }
942 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000943 // Explicitly set this socket to NONBLOCK mode
944 int flags;
945 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
946 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000947 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000948 close(clientSocket);
949 return;
950 }
951
952 // Create a new TConnection for this client socket.
953 TConnection* clientConnection =
David Reiss105961d2010-10-06 17:10:17 +0000954 createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000955
956 // Fail fast if we could not create a TConnection object
957 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000958 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000959 close(clientSocket);
960 return;
961 }
962
963 // Put this client connection into the proper state
964 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000965
966 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000967 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000968 }
Mark Slee79b16942007-11-26 19:05:29 +0000969
Mark Slee2f6404d2006-10-10 01:37:40 +0000970 // Done looping accept, now we have to make sure the error is due to
971 // blocking. Any other error is a problem
972 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000973 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000974 }
975}
976
977/**
Mark Slee79b16942007-11-26 19:05:29 +0000978 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000979 */
Mark Slee79b16942007-11-26 19:05:29 +0000980void TNonblockingServer::listenSocket() {
981 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000982 struct addrinfo hints, *res, *res0;
983 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000984
Mark Sleefb4b5142007-11-20 01:27:08 +0000985 char port[sizeof("65536") + 1];
986 memset(&hints, 0, sizeof(hints));
987 hints.ai_family = PF_UNSPEC;
988 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000989 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000990 sprintf(port, "%d", port_);
991
992 // Wildcard address
993 error = getaddrinfo(NULL, port, &hints, &res0);
994 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000995 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
996 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000997 return;
998 }
999
1000 // Pick the ipv6 address first since ipv4 addresses can be mapped
1001 // into ipv6 space.
1002 for (res = res0; res; res = res->ai_next) {
1003 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
1004 break;
1005 }
1006
Mark Slee2f6404d2006-10-10 01:37:40 +00001007 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +00001008 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
1009 if (s == -1) {
1010 freeaddrinfo(res0);
1011 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +00001012 }
1013
David Reiss13aea462008-06-10 22:56:04 +00001014 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +00001015 if (res->ai_family == AF_INET6) {
1016 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +00001017 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +00001018 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
1019 }
David Reiss13aea462008-06-10 22:56:04 +00001020 }
1021 #endif // #ifdef IPV6_V6ONLY
1022
1023
Mark Slee79b16942007-11-26 19:05:29 +00001024 int one = 1;
1025
1026 // Set reuseaddr to avoid 2MSL delay on server restart
Roger Meier30aae0c2011-07-08 12:23:31 +00001027 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +00001028
Roger Meier30aae0c2011-07-08 12:23:31 +00001029 if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
Mark Slee79b16942007-11-26 19:05:29 +00001030 close(s);
1031 freeaddrinfo(res0);
1032 throw TException("TNonblockingServer::serve() bind");
1033 }
1034
1035 // Done with the addr info
1036 freeaddrinfo(res0);
1037
1038 // Set up this file descriptor for listening
1039 listenSocket(s);
1040}
1041
1042/**
1043 * Takes a socket created by listenSocket() and sets various options on it
1044 * to prepare for use in the server.
1045 */
1046void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +00001047 // Set socket to nonblocking mode
1048 int flags;
Mark Slee79b16942007-11-26 19:05:29 +00001049 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
1050 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
1051 close(s);
1052 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +00001053 }
1054
1055 int one = 1;
1056 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +00001057
1058 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +00001059 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001060
1061 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +00001062 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +00001063
1064 // Set TCP nodelay if available, MAC OS X Hack
1065 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
1066 #ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +00001067 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001068 #endif
1069
David Reiss1c20c872010-03-09 05:20:14 +00001070 #ifdef TCP_LOW_MIN_RTO
1071 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +00001072 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +00001073 }
1074 #endif
1075
Mark Slee79b16942007-11-26 19:05:29 +00001076 if (listen(s, LISTEN_BACKLOG) == -1) {
1077 close(s);
1078 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +00001079 }
1080
Mark Slee79b16942007-11-26 19:05:29 +00001081 // Cool, this socket is good to go, set it as the serverSocket_
1082 serverSocket_ = s;
1083}
1084
David Reiss01fe1532010-03-09 05:19:25 +00001085void TNonblockingServer::createNotificationPipe() {
Roger Meier30aae0c2011-07-08 12:23:31 +00001086 if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
1087 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
1088 throw TException("can't create notification pipe");
David Reiss01fe1532010-03-09 05:19:25 +00001089 }
Roger Meier30aae0c2011-07-08 12:23:31 +00001090 if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
1091 evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
David Reiss83b8fda2010-03-09 05:19:34 +00001092 close(notificationPipeFDs_[0]);
1093 close(notificationPipeFDs_[1]);
1094 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
1095 }
David Reiss01fe1532010-03-09 05:19:25 +00001096}
1097
Mark Slee79b16942007-11-26 19:05:29 +00001098/**
1099 * Register the core libevent events onto the proper base.
1100 */
Roger Meierc1905582011-08-02 23:37:36 +00001101void TNonblockingServer::registerEvents(event_base* base, bool ownEventBase) {
Mark Slee79b16942007-11-26 19:05:29 +00001102 assert(serverSocket_ != -1);
1103 assert(!eventBase_);
1104 eventBase_ = base;
Roger Meierc1905582011-08-02 23:37:36 +00001105 ownEventBase_ = ownEventBase;
Mark Slee79b16942007-11-26 19:05:29 +00001106
1107 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +00001108 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +00001109 event_get_version(),
Bryan Duxbury37874ca2011-08-25 17:28:23 +00001110 event_base_get_method(eventBase_));
Mark Slee2f6404d2006-10-10 01:37:40 +00001111
1112 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +00001113 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +00001114 serverSocket_,
1115 EV_READ | EV_PERSIST,
1116 TNonblockingServer::eventHandler,
1117 this);
Mark Slee79b16942007-11-26 19:05:29 +00001118 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +00001119
1120 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +00001121 if (-1 == event_add(&serverEvent_, 0)) {
1122 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +00001123 }
David Reiss01fe1532010-03-09 05:19:25 +00001124 if (threadPoolProcessing_) {
1125 // Create an event to be notified when a task finishes
1126 event_set(&notificationEvent_,
1127 getNotificationRecvFD(),
1128 EV_READ | EV_PERSIST,
1129 TConnection::taskHandler,
1130 this);
David Reiss1c20c872010-03-09 05:20:14 +00001131
David Reiss01fe1532010-03-09 05:19:25 +00001132 // Attach to the base
1133 event_base_set(eventBase_, &notificationEvent_);
1134
1135 // Add the event and start up the server
1136 if (-1 == event_add(&notificationEvent_, 0)) {
1137 throw TException("TNonblockingServer::serve(): notification event_add fail");
1138 }
1139 }
1140}
1141
David Reiss068f4162010-03-09 05:19:45 +00001142void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
1143 threadManager_ = threadManager;
1144 if (threadManager != NULL) {
1145 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
1146 threadPoolProcessing_ = true;
1147 } else {
1148 threadPoolProcessing_ = false;
1149 }
1150}
1151
David Reiss01fe1532010-03-09 05:19:25 +00001152bool TNonblockingServer::serverOverloaded() {
1153 size_t activeConnections = numTConnections_ - connectionStack_.size();
1154 if (numActiveProcessors_ > maxActiveProcessors_ ||
1155 activeConnections > maxConnections_) {
1156 if (!overloaded_) {
1157 GlobalOutput.printf("thrift non-blocking server overload condition");
1158 overloaded_ = true;
1159 }
1160 } else {
1161 if (overloaded_ &&
1162 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
1163 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
1164 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
1165 nConnectionsDropped_, nTotalConnectionsDropped_);
1166 nConnectionsDropped_ = 0;
1167 overloaded_ = false;
1168 }
1169 }
1170
1171 return overloaded_;
1172}
1173
1174bool TNonblockingServer::drainPendingTask() {
1175 if (threadManager_) {
1176 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1177 if (task) {
1178 TConnection* connection =
1179 static_cast<TConnection::Task*>(task.get())->getTConnection();
1180 assert(connection && connection->getServer()
1181 && connection->getState() == APP_WAIT_TASK);
1182 connection->forceClose();
1183 return true;
1184 }
1185 }
1186 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001187}
1188
David Reiss068f4162010-03-09 05:19:45 +00001189void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
1190 TConnection* connection =
1191 static_cast<TConnection::Task*>(task.get())->getTConnection();
1192 assert(connection && connection->getServer()
1193 && connection->getState() == APP_WAIT_TASK);
1194 connection->forceClose();
1195}
1196
Mark Slee79b16942007-11-26 19:05:29 +00001197/**
1198 * Main workhorse function, starts up the server listening on a port and
1199 * loops over the libevent handler.
1200 */
1201void TNonblockingServer::serve() {
1202 // Init socket
1203 listenSocket();
1204
David Reiss01fe1532010-03-09 05:19:25 +00001205 if (threadPoolProcessing_) {
1206 // Init task completion notification pipe
1207 createNotificationPipe();
1208 }
1209
Mark Slee79b16942007-11-26 19:05:29 +00001210 // Initialize libevent core
Bryan Duxbury37874ca2011-08-25 17:28:23 +00001211 registerEvents(static_cast<event_base*>(event_base_new()), true);
Mark Slee2f6404d2006-10-10 01:37:40 +00001212
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001213 // Run the preServe event
1214 if (eventHandler_ != NULL) {
1215 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001216 }
1217
Bryan Duxbury76c43682011-08-24 21:26:48 +00001218 // Run libevent engine, invokes calls to eventHandler
1219 // Only returns if stop() is called.
Mark Slee79b16942007-11-26 19:05:29 +00001220 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +00001221}
1222
Bryan Duxbury76c43682011-08-24 21:26:48 +00001223void TNonblockingServer::stop() {
1224 if (!eventBase_) {
1225 return;
1226 }
1227
1228 // Call event_base_loopbreak() to tell libevent to exit the loop
1229 //
1230 // (The libevent documentation doesn't explicitly state that this function is
1231 // safe to call from another thread. However, all it does is set a variable,
1232 // in the event_base, so it should be fine.)
1233 event_base_loopbreak(eventBase_);
1234
1235 // event_base_loopbreak() only causes the loop to exit the next time it wakes
1236 // up. We need to force it to wake up, in case there are no real events
1237 // it needs to process.
1238 //
1239 // Attempt to connect to the server socket. If anything fails,
1240 // we'll just have to wait until libevent wakes up on its own.
1241 //
1242 // First create a socket
1243 int fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
1244 if (fd < 0) {
1245 return;
1246 }
1247
1248 // Set up the address
1249 struct sockaddr_in addr;
1250 addr.sin_family = AF_INET;
1251 addr.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1
1252 addr.sin_port = htons(port_);
1253
1254 // Finally do the connect().
1255 // We don't care about the return value;
1256 // we're just going to close the socket either way.
1257 connect(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
1258 close(fd);
1259}
1260
T Jake Lucianib5e62212009-01-31 22:36:20 +00001261}}} // apache::thrift::server