blob: b817260fcc2448d19584b805237f6c0f0943b5c4 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000022#include <transport/TSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000023
Mark Sleee02385b2007-06-09 01:21:16 +000024#include <iostream>
Roger Meier30aae0c2011-07-08 12:23:31 +000025
26#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000027#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000028#endif
29
30#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000031#include <netinet/in.h>
32#include <netinet/tcp.h>
Bryan Duxbury76c43682011-08-24 21:26:48 +000033#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000034#endif
35
36#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000037#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000038#endif
39
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <fcntl.h>
41#include <errno.h>
42#include <assert.h>
43
David Reiss9b903442009-10-21 05:51:28 +000044#ifndef AF_LOCAL
45#define AF_LOCAL AF_UNIX
46#endif
47
T Jake Lucianib5e62212009-01-31 22:36:20 +000048namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000049
T Jake Lucianib5e62212009-01-31 22:36:20 +000050using namespace apache::thrift::protocol;
51using namespace apache::thrift::transport;
52using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000053using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000054using apache::thrift::transport::TSocket;
55using apache::thrift::transport::TTransportException;
Mark Sleee02385b2007-06-09 01:21:16 +000056
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000057/// Three states for sockets: recv frame size, recv data, and send mode
58enum TSocketState {
59 SOCKET_RECV_FRAMING,
60 SOCKET_RECV,
61 SOCKET_SEND
62};
63
64/**
65 * Five states for the nonblocking server:
66 * 1) initialize
67 * 2) read 4 byte frame size
68 * 3) read frame of data
69 * 4) send back data (if any)
70 * 5) force immediate connection close
71 */
72enum TAppState {
73 APP_INIT,
74 APP_READ_FRAME_SIZE,
75 APP_READ_REQUEST,
76 APP_WAIT_TASK,
77 APP_SEND_RESULT,
78 APP_CLOSE_CONNECTION
79};
80
81/**
82 * Represents a connection that is handled via libevent. This connection
83 * essentially encapsulates a socket that has some associated libevent state.
84 */
85class TNonblockingServer::TConnection {
86 private:
87
88 /// Server handle
89 TNonblockingServer* server_;
90
91 /// Object wrapping network socket
92 boost::shared_ptr<TSocket> tSocket_;
93
94 /// Libevent object
95 struct event event_;
96
97 /// Libevent flags
98 short eventFlags_;
99
100 /// Socket mode
101 TSocketState socketState_;
102
103 /// Application state
104 TAppState appState_;
105
106 /// How much data needed to read
107 uint32_t readWant_;
108
109 /// Where in the read buffer are we
110 uint32_t readBufferPos_;
111
112 /// Read buffer
113 uint8_t* readBuffer_;
114
115 /// Read buffer size
116 uint32_t readBufferSize_;
117
118 /// Write buffer
119 uint8_t* writeBuffer_;
120
121 /// Write buffer size
122 uint32_t writeBufferSize_;
123
124 /// How far through writing are we?
125 uint32_t writeBufferPos_;
126
127 /// Largest size of write buffer seen since buffer was constructed
128 size_t largestWriteBufferSize_;
129
130 /// Count of the number of calls for use with getResizeBufferEveryN().
131 int32_t callsForResize_;
132
133 /// Task handle
134 int taskHandle_;
135
136 /// Task event
137 struct event taskEvent_;
138
139 /// Transport to read from
140 boost::shared_ptr<TMemoryBuffer> inputTransport_;
141
142 /// Transport that processor writes to
143 boost::shared_ptr<TMemoryBuffer> outputTransport_;
144
145 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
146 boost::shared_ptr<TTransport> factoryInputTransport_;
147 boost::shared_ptr<TTransport> factoryOutputTransport_;
148
149 /// Protocol decoder
150 boost::shared_ptr<TProtocol> inputProtocol_;
151
152 /// Protocol encoder
153 boost::shared_ptr<TProtocol> outputProtocol_;
154
155 /// Server event handler, if any
156 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
157
158 /// Thrift call context, if any
159 void *connectionContext_;
160
161 /// Go into read mode
162 void setRead() {
163 setFlags(EV_READ | EV_PERSIST);
164 }
165
166 /// Go into write mode
167 void setWrite() {
168 setFlags(EV_WRITE | EV_PERSIST);
169 }
170
171 /// Set socket idle
172 void setIdle() {
173 setFlags(0);
174 }
175
176 /**
177 * Set event flags for this connection.
178 *
179 * @param eventFlags flags we pass to libevent for the connection.
180 */
181 void setFlags(short eventFlags);
182
183 /**
184 * Libevent handler called (via our static wrapper) when the connection
185 * socket had something happen. Rather than use the flags libevent passed,
186 * we use the connection state to determine whether we need to read or
187 * write the socket.
188 */
189 void workSocket();
190
191 /// Close this connection and free or reset its resources.
192 void close();
193
194 public:
195
196 class Task;
197
198 /// Constructor
199 TConnection(int socket, short eventFlags, TNonblockingServer *s,
200 const sockaddr* addr, socklen_t addrLen) {
201 readBuffer_ = NULL;
202 readBufferSize_ = 0;
203
204 // Allocate input and output transports
205 // these only need to be allocated once per TConnection (they don't need to be
206 // reallocated on init() call)
207 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
208 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
209 tSocket_.reset(new TSocket());
210
211 init(socket, eventFlags, s, addr, addrLen);
212 server_->incrementNumConnections();
213 }
214
215 ~TConnection() {
216 std::free(readBuffer_);
217 server_->decrementNumConnections();
218 }
219
220 /**
221 * Check buffers against any size limits and shrink it if exceeded.
222 *
223 * @param readLimit we reduce read buffer size to this (if nonzero).
224 * @param writeLimit if nonzero and write buffer is larger, replace it.
225 */
226 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
227
228 /// Initialize
229 void init(int socket, short eventFlags, TNonblockingServer *s,
230 const sockaddr* addr, socklen_t addrLen);
231
232 /**
233 * This is called when the application transitions from one state into
234 * another. This means that it has finished writing the data that it needed
235 * to, or finished receiving the data that it needed to.
236 */
237 void transition();
238
239 /**
240 * C-callable event handler for connection events. Provides a callback
241 * that libevent can understand which invokes connection_->workSocket().
242 *
243 * @param fd the descriptor the event occurred on.
244 * @param which the flags associated with the event.
245 * @param v void* callback arg where we placed TConnection's "this".
246 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000247 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000248 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
249 ((TConnection*)v)->workSocket();
250 }
251
252 /**
253 * C-callable event handler for signaling task completion. Provides a
254 * callback that libevent can understand that will read a connection
255 * object's address from a pipe and call connection->transition() for
256 * that object.
257 *
258 * @param fd the descriptor the event occurred on.
259 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000260 static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000261 TConnection* connection;
262 ssize_t nBytes;
Bryan Duxbury266b1732011-09-01 16:50:28 +0000263 while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000264 == sizeof(TConnection*)) {
265 connection->transition();
266 }
267 if (nBytes > 0) {
268 throw TException("TConnection::taskHandler unexpected partial read");
269 }
270 if (errno != EWOULDBLOCK && errno != EAGAIN) {
271 GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
272 }
273 }
274
275 /**
276 * Notification to server that processing has ended on this request.
277 * Can be called either when processing is completed or when a waiting
278 * task has been preemptively terminated (on overload).
279 *
280 * @return true if successful, false if unable to notify (check errno).
281 */
282 bool notifyServer() {
283 TConnection* connection = this;
Bryan Duxbury266b1732011-09-01 16:50:28 +0000284 if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
285 sizeof(TConnection*), 0) != sizeof(TConnection*)) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000286 return false;
287 }
288
289 return true;
290 }
291
292 /// Force connection shutdown for this connection.
293 void forceClose() {
294 appState_ = APP_CLOSE_CONNECTION;
295 if (!notifyServer()) {
296 throw TException("TConnection::forceClose: failed write on notify pipe");
297 }
298 }
299
300 /// return the server this connection was initialized for.
301 TNonblockingServer* getServer() {
302 return server_;
303 }
304
305 /// get state of connection.
306 TAppState getState() {
307 return appState_;
308 }
309
310 /// return the TSocket transport wrapping this network connection
311 boost::shared_ptr<TSocket> getTSocket() const {
312 return tSocket_;
313 }
314
315 /// return the server event handler if any
316 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
317 return serverEventHandler_;
318 }
319
320 /// return the Thrift connection context if any
321 void* getConnectionContext() {
322 return connectionContext_;
323 }
324
325};
326
327class TNonblockingServer::TConnection::Task: public Runnable {
Mark Sleee02385b2007-06-09 01:21:16 +0000328 public:
329 Task(boost::shared_ptr<TProcessor> processor,
330 boost::shared_ptr<TProtocol> input,
331 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +0000332 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +0000333 processor_(processor),
334 input_(input),
335 output_(output),
David Reiss105961d2010-10-06 17:10:17 +0000336 connection_(connection),
337 serverEventHandler_(connection_->getServerEventHandler()),
338 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000339
340 void run() {
341 try {
David Reiss105961d2010-10-06 17:10:17 +0000342 for (;;) {
343 if (serverEventHandler_ != NULL) {
344 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
345 }
346 if (!processor_->process(input_, output_, connectionContext_) ||
347 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000348 break;
349 }
350 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000351 } catch (const TTransportException& ttx) {
352 GlobalOutput.printf("TNonblockingServer client died: %s", ttx.what());
353 } catch (const bad_alloc&) {
354 GlobalOutput("TNonblockingServer caught bad_alloc exception.");
David Reiss28e88ec2010-03-09 05:19:27 +0000355 exit(-1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000356 } catch (const std::exception& x) {
357 GlobalOutput.printf("TNonblockingServer process() exception: %s: %s",
358 typeid(x).name(), x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000359 } catch (...) {
Bryan Duxbury1e987582011-08-25 17:33:03 +0000360 GlobalOutput("TNonblockingServer uncaught exception.");
Mark Sleee02385b2007-06-09 01:21:16 +0000361 }
Mark Slee79b16942007-11-26 19:05:29 +0000362
David Reiss01fe1532010-03-09 05:19:25 +0000363 // Signal completion back to the libevent thread via a pipe
364 if (!connection_->notifyServer()) {
365 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000366 }
David Reiss01fe1532010-03-09 05:19:25 +0000367 }
368
369 TConnection* getTConnection() {
370 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +0000371 }
372
373 private:
374 boost::shared_ptr<TProcessor> processor_;
375 boost::shared_ptr<TProtocol> input_;
376 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000377 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000378 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
379 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000380};
Mark Slee5ea15f92007-03-05 22:55:59 +0000381
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000382void TNonblockingServer::TConnection::init(int socket, short eventFlags,
383 TNonblockingServer* s,
384 const sockaddr* addr,
385 socklen_t addrLen) {
David Reiss105961d2010-10-06 17:10:17 +0000386 tSocket_->setSocketFD(socket);
387 tSocket_->setCachedAddress(addr, addrLen);
388
Mark Slee2f6404d2006-10-10 01:37:40 +0000389 server_ = s;
390 appState_ = APP_INIT;
391 eventFlags_ = 0;
392
393 readBufferPos_ = 0;
394 readWant_ = 0;
395
396 writeBuffer_ = NULL;
397 writeBufferSize_ = 0;
398 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000399 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000400
David Reiss89a12942010-10-06 17:10:52 +0000401 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000402 appState_ = APP_INIT;
David Reiss54bec5d2010-10-06 17:10:45 +0000403 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000404
Mark Slee2f6404d2006-10-10 01:37:40 +0000405 // Set flags, which also registers the event
406 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000407
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000408 // get input/transports
409 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
410 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000411
412 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000413 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
414 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000415
416 // Set up for any server event handler
417 serverEventHandler_ = server_->getEventHandler();
418 if (serverEventHandler_ != NULL) {
419 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
420 } else {
421 connectionContext_ = NULL;
422 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000423}
424
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000425void TNonblockingServer::TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000426 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000427 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000428
429 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000430 case SOCKET_RECV_FRAMING:
431 union {
432 uint8_t buf[sizeof(uint32_t)];
433 int32_t size;
434 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000435
David Reiss89a12942010-10-06 17:10:52 +0000436 // if we've already received some bytes we kept them here
437 framing.size = readWant_;
438 // determine size of this frame
439 try {
440 // Read from the socket
441 fetch = tSocket_->read(&framing.buf[readBufferPos_],
442 uint32_t(sizeof(framing.size) - readBufferPos_));
443 if (fetch == 0) {
444 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000445 close();
446 return;
447 }
David Reiss89a12942010-10-06 17:10:52 +0000448 readBufferPos_ += fetch;
449 } catch (TTransportException& te) {
450 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
451 close();
452
453 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000454 }
455
David Reiss89a12942010-10-06 17:10:52 +0000456 if (readBufferPos_ < sizeof(framing.size)) {
457 // more needed before frame size is known -- save what we have so far
458 readWant_ = framing.size;
459 return;
460 }
461
462 readWant_ = ntohl(framing.size);
463 if (static_cast<int>(readWant_) <= 0) {
464 GlobalOutput.printf("TConnection:workSocket() Negative frame size %d, remote side not using TFramedTransport?", static_cast<int>(readWant_));
465 close();
466 return;
467 }
468 // size known; now get the rest of the frame
469 transition();
470 return;
471
472 case SOCKET_RECV:
473 // It is an error to be in this state if we already have all the data
474 assert(readBufferPos_ < readWant_);
475
David Reiss105961d2010-10-06 17:10:17 +0000476 try {
477 // Read from the socket
478 fetch = readWant_ - readBufferPos_;
479 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
480 }
481 catch (TTransportException& te) {
482 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
483 close();
Mark Slee79b16942007-11-26 19:05:29 +0000484
David Reiss105961d2010-10-06 17:10:17 +0000485 return;
486 }
487
Mark Slee2f6404d2006-10-10 01:37:40 +0000488 if (got > 0) {
489 // Move along in the buffer
490 readBufferPos_ += got;
491
492 // Check that we did not overdo it
493 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000494
Mark Slee2f6404d2006-10-10 01:37:40 +0000495 // We are done reading, move onto the next state
496 if (readBufferPos_ == readWant_) {
497 transition();
498 }
499 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000500 }
501
502 // Whenever we get down here it means a remote disconnect
503 close();
Mark Slee79b16942007-11-26 19:05:29 +0000504
Mark Slee2f6404d2006-10-10 01:37:40 +0000505 return;
506
507 case SOCKET_SEND:
508 // Should never have position past size
509 assert(writeBufferPos_ <= writeBufferSize_);
510
511 // If there is no data to send, then let us move on
512 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000513 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000514 transition();
515 return;
516 }
517
David Reiss105961d2010-10-06 17:10:17 +0000518 try {
519 left = writeBufferSize_ - writeBufferPos_;
520 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
521 }
522 catch (TTransportException& te) {
523 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000524 close();
525 return;
526 }
527
528 writeBufferPos_ += sent;
529
530 // Did we overdo it?
531 assert(writeBufferPos_ <= writeBufferSize_);
532
Mark Slee79b16942007-11-26 19:05:29 +0000533 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000534 if (writeBufferPos_ == writeBufferSize_) {
535 transition();
536 }
537
538 return;
539
540 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000541 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000542 assert(0);
543 }
544}
545
546/**
547 * This is called when the application transitions from one state into
548 * another. This means that it has finished writing the data that it needed
549 * to, or finished receiving the data that it needed to.
550 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000551void TNonblockingServer::TConnection::transition() {
552
553 int sz = 0;
554
Mark Slee2f6404d2006-10-10 01:37:40 +0000555 // Switch upon the state that we are currently in and move to a new state
556 switch (appState_) {
557
558 case APP_READ_REQUEST:
559 // We are done reading the request, package the read buffer into transport
560 // and get back some data from the dispatch function
561 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000562 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000563 // Prepend four bytes of blank space to the buffer so we can
564 // write the frame size there later.
565 outputTransport_->getWritePtr(4);
566 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000567
David Reiss01fe1532010-03-09 05:19:25 +0000568 server_->incrementActiveProcessors();
569
Mark Sleee02385b2007-06-09 01:21:16 +0000570 if (server_->isThreadPoolProcessing()) {
571 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000572
David Reiss01fe1532010-03-09 05:19:25 +0000573 // Create task and dispatch to the thread manager
574 boost::shared_ptr<Runnable> task =
575 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
576 inputProtocol_,
577 outputProtocol_,
578 this));
579 // The application is now waiting on the task to finish
580 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000581
David Reisse11f3072008-10-07 21:39:19 +0000582 try {
583 server_->addTask(task);
584 } catch (IllegalStateException & ise) {
585 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000586 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000587 close();
588 }
Mark Slee402ee282007-08-23 01:43:20 +0000589
David Reiss01fe1532010-03-09 05:19:25 +0000590 // Set this connection idle so that libevent doesn't process more
591 // data on it while we're still waiting for the threadmanager to
592 // finish this task
593 setIdle();
594 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000595 } else {
596 try {
597 // Invoke the processor
Bryan Duxbury489f8f12011-08-29 18:50:12 +0000598 server_->getProcessor()->process(inputProtocol_, outputProtocol_,
599 connectionContext_);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000600 } catch (const TTransportException &ttx) {
601 GlobalOutput.printf("TNonblockingServer transport error in "
602 "process(): %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000603 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000604 close();
605 return;
Bryan Duxbury1e987582011-08-25 17:33:03 +0000606 } catch (const std::exception &x) {
607 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
608 typeid(x).name(), x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000609 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000610 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000611 return;
612 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000613 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000614 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000615 close();
616 return;
617 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000618 }
619
Mark Slee402ee282007-08-23 01:43:20 +0000620 // Intentionally fall through here, the call to process has written into
621 // the writeBuffer_
622
Mark Sleee02385b2007-06-09 01:21:16 +0000623 case APP_WAIT_TASK:
624 // We have now finished processing a task and the result has been written
625 // into the outputTransport_, so we grab its contents and place them into
626 // the writeBuffer_ for actual writing by the libevent thread
627
David Reiss01fe1532010-03-09 05:19:25 +0000628 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000629 // Get the result of the operation
630 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
631
632 // If the function call generated return data, then move into the send
633 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000634 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000635 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000636
637 // Move into write state
638 writeBufferPos_ = 0;
639 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000640
David Reissaf787782008-07-03 20:29:34 +0000641 // Put the frame size into the write buffer
642 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
643 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000644
645 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000646 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000647 setWrite();
648
649 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000650 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000651
652 return;
653 }
654
David Reissc51986f2009-03-24 20:01:25 +0000655 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000656 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000657 goto LABEL_APP_INIT;
658
Mark Slee2f6404d2006-10-10 01:37:40 +0000659 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000660 // it's now safe to perform buffer size housekeeping.
661 if (writeBufferSize_ > largestWriteBufferSize_) {
662 largestWriteBufferSize_ = writeBufferSize_;
663 }
664 if (server_->getResizeBufferEveryN() > 0
665 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
666 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
667 server_->getIdleWriteBufferLimit());
668 callsForResize_ = 0;
669 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000670
671 // N.B.: We also intentionally fall through here into the INIT state!
672
Mark Slee92f00fb2006-10-25 01:28:17 +0000673 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000674 case APP_INIT:
675
676 // Clear write buffer variables
677 writeBuffer_ = NULL;
678 writeBufferPos_ = 0;
679 writeBufferSize_ = 0;
680
Mark Slee2f6404d2006-10-10 01:37:40 +0000681 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000682 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000683 appState_ = APP_READ_FRAME_SIZE;
684
David Reiss89a12942010-10-06 17:10:52 +0000685 readBufferPos_ = 0;
686
Mark Slee2f6404d2006-10-10 01:37:40 +0000687 // Register read event
688 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000689
Mark Slee2f6404d2006-10-10 01:37:40 +0000690 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000691 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000692
693 return;
694
695 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000696 // We just read the request length
697 // Double the buffer size until it is big enough
698 if (readWant_ > readBufferSize_) {
699 if (readBufferSize_ == 0) {
700 readBufferSize_ = 1;
701 }
702 uint32_t newSize = readBufferSize_;
703 while (readWant_ > newSize) {
704 newSize *= 2;
705 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000706
David Reiss89a12942010-10-06 17:10:52 +0000707 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
708 if (newBuffer == NULL) {
709 // nothing else to be done...
710 throw std::bad_alloc();
711 }
712 readBuffer_ = newBuffer;
713 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000714 }
715
Mark Slee2f6404d2006-10-10 01:37:40 +0000716 readBufferPos_= 0;
717
718 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000719 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000720 appState_ = APP_READ_REQUEST;
721
722 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000723 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000724
725 return;
726
David Reiss01fe1532010-03-09 05:19:25 +0000727 case APP_CLOSE_CONNECTION:
728 server_->decrementActiveProcessors();
729 close();
730 return;
731
Mark Slee2f6404d2006-10-10 01:37:40 +0000732 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000733 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000734 assert(0);
735 }
736}
737
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000738void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000739 // Catch the do nothing case
740 if (eventFlags_ == eventFlags) {
741 return;
742 }
743
744 // Delete a previously existing event
745 if (eventFlags_ != 0) {
746 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000747 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000748 return;
749 }
750 }
751
752 // Update in memory structure
753 eventFlags_ = eventFlags;
754
Mark Slee402ee282007-08-23 01:43:20 +0000755 // Do not call event_set if there are no flags
756 if (!eventFlags_) {
757 return;
758 }
759
David Reiss01fe1532010-03-09 05:19:25 +0000760 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000761 * event_set:
762 *
763 * Prepares the event structure &event to be used in future calls to
764 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000765 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000766 *
767 * The events can be either EV_READ, EV_WRITE, or both, indicating
768 * that an application can read or write from the file respectively without
769 * blocking.
770 *
Mark Sleee02385b2007-06-09 01:21:16 +0000771 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000772 * the event and the type of event which will be one of: EV_TIMEOUT,
773 * EV_SIGNAL, EV_READ, EV_WRITE.
774 *
775 * The additional flag EV_PERSIST makes an event_add() persistent until
776 * event_del() has been called.
777 *
778 * Once initialized, the &event struct can be used repeatedly with
779 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000780 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000781 * when an ev structure has been added to libevent using event_add() the
782 * structure must persist until the event occurs (assuming EV_PERSIST
783 * is not set) or is removed using event_del(). You may not reuse the same
784 * ev structure for multiple monitored descriptors; each descriptor needs
785 * its own ev.
786 */
David Reiss105961d2010-10-06 17:10:17 +0000787 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
788 TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000789 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000790
791 // Add the event
792 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000793 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000794 }
795}
796
797/**
798 * Closes a connection
799 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000800void TNonblockingServer::TConnection::close() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000801 // Delete the registered libevent
802 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000803 GlobalOutput.perror("TConnection::close() event_del", errno);
804 }
805
806 if (serverEventHandler_ != NULL) {
807 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000808 }
809
810 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000811 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000812
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000813 // close any factory produced transports
814 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000815 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000816
Mark Slee2f6404d2006-10-10 01:37:40 +0000817 // Give this object back to the server that owns it
818 server_->returnConnection(this);
819}
820
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000821void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
822 size_t readLimit,
823 size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000824 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000825 free(readBuffer_);
826 readBuffer_ = NULL;
827 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000828 }
David Reiss54bec5d2010-10-06 17:10:45 +0000829
830 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
831 // just start over
David Reiss89a12942010-10-06 17:10:52 +0000832 outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
David Reiss54bec5d2010-10-06 17:10:45 +0000833 largestWriteBufferSize_ = 0;
834 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000835}
836
David Reiss8ede8182010-09-02 15:26:28 +0000837TNonblockingServer::~TNonblockingServer() {
838 // TODO: We currently leak any active TConnection objects.
839 // Since we're shutting down and destroying the event_base, the TConnection
840 // objects will never receive any additional callbacks. (And even if they
841 // did, it would be bad, since they keep a pointer around to the server,
842 // which is being destroyed.)
843
844 // Clean up unused TConnection objects in connectionStack_
845 while (!connectionStack_.empty()) {
846 TConnection* connection = connectionStack_.top();
847 connectionStack_.pop();
848 delete connection;
849 }
850
Roger Meierc1905582011-08-02 23:37:36 +0000851 if (eventBase_ && ownEventBase_) {
David Reiss8ede8182010-09-02 15:26:28 +0000852 event_base_free(eventBase_);
853 }
854
855 if (serverSocket_ >= 0) {
856 close(serverSocket_);
857 }
858}
859
Mark Slee2f6404d2006-10-10 01:37:40 +0000860/**
861 * Creates a new connection either by reusing an object off the stack or
862 * by allocating a new one entirely
863 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000864TNonblockingServer::TConnection* TNonblockingServer::createConnection(
865 int socket, short flags,
866 const sockaddr* addr,
867 socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000868 // Check the stack
869 if (connectionStack_.empty()) {
David Reiss105961d2010-10-06 17:10:17 +0000870 return new TConnection(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000871 } else {
872 TConnection* result = connectionStack_.top();
873 connectionStack_.pop();
David Reiss105961d2010-10-06 17:10:17 +0000874 result->init(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000875 return result;
876 }
877}
878
879/**
880 * Returns a connection to the stack
881 */
882void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000883 if (connectionStackLimit_ &&
884 (connectionStack_.size() >= connectionStackLimit_)) {
885 delete connection;
886 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000887 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000888 connectionStack_.push(connection);
889 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000890}
891
892/**
David Reissa79e4882008-03-05 07:51:47 +0000893 * Server socket had something happen. We accept all waiting client
894 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000895 */
896void TNonblockingServer::handleEvent(int fd, short which) {
Roger Meier3b771a12010-11-17 22:11:26 +0000897 (void) which;
David Reiss3bb5e052010-01-25 19:31:31 +0000898 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000899 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000900
Mark Slee2f6404d2006-10-10 01:37:40 +0000901 // Server socket accepted a new connection
902 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000903 sockaddr_storage addrStorage;
904 sockaddr* addrp = (sockaddr*)&addrStorage;
905 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000906
Mark Slee2f6404d2006-10-10 01:37:40 +0000907 // Going to accept a new client socket
908 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000909
Mark Slee2f6404d2006-10-10 01:37:40 +0000910 // Accept as many new clients as possible, even though libevent signaled only
911 // one, this helps us to avoid having to go back into the libevent engine so
912 // many times
David Reiss105961d2010-10-06 17:10:17 +0000913 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000914 // If we're overloaded, take action here
915 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
916 nConnectionsDropped_++;
917 nTotalConnectionsDropped_++;
918 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
919 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000920 return;
David Reiss01fe1532010-03-09 05:19:25 +0000921 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
922 if (!drainPendingTask()) {
923 // Nothing left to discard, so we drop connection instead.
924 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000925 return;
David Reiss01fe1532010-03-09 05:19:25 +0000926 }
927 }
928 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000929 // Explicitly set this socket to NONBLOCK mode
930 int flags;
931 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
932 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000933 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000934 close(clientSocket);
935 return;
936 }
937
938 // Create a new TConnection for this client socket.
939 TConnection* clientConnection =
David Reiss105961d2010-10-06 17:10:17 +0000940 createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000941
942 // Fail fast if we could not create a TConnection object
943 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000944 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000945 close(clientSocket);
946 return;
947 }
948
949 // Put this client connection into the proper state
950 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000951
952 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000953 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000954 }
Mark Slee79b16942007-11-26 19:05:29 +0000955
Mark Slee2f6404d2006-10-10 01:37:40 +0000956 // Done looping accept, now we have to make sure the error is due to
957 // blocking. Any other error is a problem
958 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000959 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000960 }
961}
962
963/**
Mark Slee79b16942007-11-26 19:05:29 +0000964 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000965 */
Mark Slee79b16942007-11-26 19:05:29 +0000966void TNonblockingServer::listenSocket() {
967 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000968 struct addrinfo hints, *res, *res0;
969 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000970
Mark Sleefb4b5142007-11-20 01:27:08 +0000971 char port[sizeof("65536") + 1];
972 memset(&hints, 0, sizeof(hints));
973 hints.ai_family = PF_UNSPEC;
974 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000975 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000976 sprintf(port, "%d", port_);
977
978 // Wildcard address
979 error = getaddrinfo(NULL, port, &hints, &res0);
980 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000981 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
982 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000983 return;
984 }
985
986 // Pick the ipv6 address first since ipv4 addresses can be mapped
987 // into ipv6 space.
988 for (res = res0; res; res = res->ai_next) {
989 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
990 break;
991 }
992
Mark Slee2f6404d2006-10-10 01:37:40 +0000993 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000994 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
995 if (s == -1) {
996 freeaddrinfo(res0);
997 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000998 }
999
David Reiss13aea462008-06-10 22:56:04 +00001000 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +00001001 if (res->ai_family == AF_INET6) {
1002 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +00001003 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +00001004 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
1005 }
David Reiss13aea462008-06-10 22:56:04 +00001006 }
1007 #endif // #ifdef IPV6_V6ONLY
1008
1009
Mark Slee79b16942007-11-26 19:05:29 +00001010 int one = 1;
1011
1012 // Set reuseaddr to avoid 2MSL delay on server restart
Roger Meier30aae0c2011-07-08 12:23:31 +00001013 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +00001014
Roger Meier30aae0c2011-07-08 12:23:31 +00001015 if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
Mark Slee79b16942007-11-26 19:05:29 +00001016 close(s);
1017 freeaddrinfo(res0);
1018 throw TException("TNonblockingServer::serve() bind");
1019 }
1020
1021 // Done with the addr info
1022 freeaddrinfo(res0);
1023
1024 // Set up this file descriptor for listening
1025 listenSocket(s);
1026}
1027
1028/**
1029 * Takes a socket created by listenSocket() and sets various options on it
1030 * to prepare for use in the server.
1031 */
1032void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +00001033 // Set socket to nonblocking mode
1034 int flags;
Mark Slee79b16942007-11-26 19:05:29 +00001035 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
1036 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
1037 close(s);
1038 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +00001039 }
1040
1041 int one = 1;
1042 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +00001043
1044 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +00001045 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001046
1047 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +00001048 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +00001049
1050 // Set TCP nodelay if available, MAC OS X Hack
1051 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
1052 #ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +00001053 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001054 #endif
1055
David Reiss1c20c872010-03-09 05:20:14 +00001056 #ifdef TCP_LOW_MIN_RTO
1057 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +00001058 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +00001059 }
1060 #endif
1061
Mark Slee79b16942007-11-26 19:05:29 +00001062 if (listen(s, LISTEN_BACKLOG) == -1) {
1063 close(s);
1064 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +00001065 }
1066
Mark Slee79b16942007-11-26 19:05:29 +00001067 // Cool, this socket is good to go, set it as the serverSocket_
1068 serverSocket_ = s;
1069}
1070
David Reiss01fe1532010-03-09 05:19:25 +00001071void TNonblockingServer::createNotificationPipe() {
Roger Meier30aae0c2011-07-08 12:23:31 +00001072 if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
1073 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
1074 throw TException("can't create notification pipe");
David Reiss01fe1532010-03-09 05:19:25 +00001075 }
Roger Meier30aae0c2011-07-08 12:23:31 +00001076 if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
1077 evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
David Reiss83b8fda2010-03-09 05:19:34 +00001078 close(notificationPipeFDs_[0]);
1079 close(notificationPipeFDs_[1]);
1080 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
1081 }
David Reiss01fe1532010-03-09 05:19:25 +00001082}
1083
Mark Slee79b16942007-11-26 19:05:29 +00001084/**
1085 * Register the core libevent events onto the proper base.
1086 */
Roger Meierc1905582011-08-02 23:37:36 +00001087void TNonblockingServer::registerEvents(event_base* base, bool ownEventBase) {
Mark Slee79b16942007-11-26 19:05:29 +00001088 assert(serverSocket_ != -1);
1089 assert(!eventBase_);
1090 eventBase_ = base;
Roger Meierc1905582011-08-02 23:37:36 +00001091 ownEventBase_ = ownEventBase;
Mark Slee79b16942007-11-26 19:05:29 +00001092
1093 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +00001094 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +00001095 event_get_version(),
Bryan Duxbury37874ca2011-08-25 17:28:23 +00001096 event_base_get_method(eventBase_));
Mark Slee2f6404d2006-10-10 01:37:40 +00001097
1098 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +00001099 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +00001100 serverSocket_,
1101 EV_READ | EV_PERSIST,
1102 TNonblockingServer::eventHandler,
1103 this);
Mark Slee79b16942007-11-26 19:05:29 +00001104 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +00001105
1106 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +00001107 if (-1 == event_add(&serverEvent_, 0)) {
1108 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +00001109 }
David Reiss01fe1532010-03-09 05:19:25 +00001110 if (threadPoolProcessing_) {
1111 // Create an event to be notified when a task finishes
1112 event_set(&notificationEvent_,
1113 getNotificationRecvFD(),
1114 EV_READ | EV_PERSIST,
1115 TConnection::taskHandler,
1116 this);
David Reiss1c20c872010-03-09 05:20:14 +00001117
David Reiss01fe1532010-03-09 05:19:25 +00001118 // Attach to the base
1119 event_base_set(eventBase_, &notificationEvent_);
1120
1121 // Add the event and start up the server
1122 if (-1 == event_add(&notificationEvent_, 0)) {
1123 throw TException("TNonblockingServer::serve(): notification event_add fail");
1124 }
1125 }
1126}
1127
David Reiss068f4162010-03-09 05:19:45 +00001128void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
1129 threadManager_ = threadManager;
1130 if (threadManager != NULL) {
1131 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
1132 threadPoolProcessing_ = true;
1133 } else {
1134 threadPoolProcessing_ = false;
1135 }
1136}
1137
David Reiss01fe1532010-03-09 05:19:25 +00001138bool TNonblockingServer::serverOverloaded() {
1139 size_t activeConnections = numTConnections_ - connectionStack_.size();
1140 if (numActiveProcessors_ > maxActiveProcessors_ ||
1141 activeConnections > maxConnections_) {
1142 if (!overloaded_) {
1143 GlobalOutput.printf("thrift non-blocking server overload condition");
1144 overloaded_ = true;
1145 }
1146 } else {
1147 if (overloaded_ &&
1148 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
1149 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
1150 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
1151 nConnectionsDropped_, nTotalConnectionsDropped_);
1152 nConnectionsDropped_ = 0;
1153 overloaded_ = false;
1154 }
1155 }
1156
1157 return overloaded_;
1158}
1159
1160bool TNonblockingServer::drainPendingTask() {
1161 if (threadManager_) {
1162 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1163 if (task) {
1164 TConnection* connection =
1165 static_cast<TConnection::Task*>(task.get())->getTConnection();
1166 assert(connection && connection->getServer()
1167 && connection->getState() == APP_WAIT_TASK);
1168 connection->forceClose();
1169 return true;
1170 }
1171 }
1172 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001173}
1174
David Reiss068f4162010-03-09 05:19:45 +00001175void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
1176 TConnection* connection =
1177 static_cast<TConnection::Task*>(task.get())->getTConnection();
1178 assert(connection && connection->getServer()
1179 && connection->getState() == APP_WAIT_TASK);
1180 connection->forceClose();
1181}
1182
Mark Slee79b16942007-11-26 19:05:29 +00001183/**
1184 * Main workhorse function, starts up the server listening on a port and
1185 * loops over the libevent handler.
1186 */
1187void TNonblockingServer::serve() {
1188 // Init socket
1189 listenSocket();
1190
David Reiss01fe1532010-03-09 05:19:25 +00001191 if (threadPoolProcessing_) {
1192 // Init task completion notification pipe
1193 createNotificationPipe();
1194 }
1195
Mark Slee79b16942007-11-26 19:05:29 +00001196 // Initialize libevent core
Bryan Duxbury37874ca2011-08-25 17:28:23 +00001197 registerEvents(static_cast<event_base*>(event_base_new()), true);
Mark Slee2f6404d2006-10-10 01:37:40 +00001198
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001199 // Run the preServe event
1200 if (eventHandler_ != NULL) {
1201 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001202 }
1203
Bryan Duxbury76c43682011-08-24 21:26:48 +00001204 // Run libevent engine, invokes calls to eventHandler
1205 // Only returns if stop() is called.
Mark Slee79b16942007-11-26 19:05:29 +00001206 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +00001207}
1208
Bryan Duxbury76c43682011-08-24 21:26:48 +00001209void TNonblockingServer::stop() {
1210 if (!eventBase_) {
1211 return;
1212 }
1213
1214 // Call event_base_loopbreak() to tell libevent to exit the loop
1215 //
1216 // (The libevent documentation doesn't explicitly state that this function is
1217 // safe to call from another thread. However, all it does is set a variable,
1218 // in the event_base, so it should be fine.)
1219 event_base_loopbreak(eventBase_);
1220
1221 // event_base_loopbreak() only causes the loop to exit the next time it wakes
1222 // up. We need to force it to wake up, in case there are no real events
1223 // it needs to process.
1224 //
1225 // Attempt to connect to the server socket. If anything fails,
1226 // we'll just have to wait until libevent wakes up on its own.
1227 //
1228 // First create a socket
1229 int fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
1230 if (fd < 0) {
1231 return;
1232 }
1233
1234 // Set up the address
1235 struct sockaddr_in addr;
1236 addr.sin_family = AF_INET;
1237 addr.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1
1238 addr.sin_port = htons(port_);
1239
1240 // Finally do the connect().
1241 // We don't care about the return value;
1242 // we're just going to close the socket either way.
1243 connect(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
1244 close(fd);
1245}
1246
T Jake Lucianib5e62212009-01-31 22:36:20 +00001247}}} // apache::thrift::server