blob: 2b910a584151fc22fafd040ea90bf3be8f4aa812 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#include "TNonblockingServer.h"
8
9#include <sys/socket.h>
10#include <netinet/in.h>
11#include <netinet/tcp.h>
12#include <fcntl.h>
13#include <errno.h>
14#include <assert.h>
15
16namespace facebook { namespace thrift { namespace server {
17
Mark Slee5ea15f92007-03-05 22:55:59 +000018using namespace facebook::thrift::protocol;
19using namespace facebook::thrift::transport;
20
21void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000022 socket_ = socket;
23 server_ = s;
24 appState_ = APP_INIT;
25 eventFlags_ = 0;
26
27 readBufferPos_ = 0;
28 readWant_ = 0;
29
30 writeBuffer_ = NULL;
31 writeBufferSize_ = 0;
32 writeBufferPos_ = 0;
33
34 socketState_ = SOCKET_RECV;
35 appState_ = APP_INIT;
36
37 // Set flags, which also registers the event
38 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000039
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000040 // get input/transports
41 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
42 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000043
44 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000045 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
46 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +000047}
48
49void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +000050 int flags=0, got=0, left=0, sent=0;
51 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +000052
53 switch (socketState_) {
54 case SOCKET_RECV:
55 // It is an error to be in this state if we already have all the data
56 assert(readBufferPos_ < readWant_);
57
Mark Slee2f6404d2006-10-10 01:37:40 +000058 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +000059 if (readWant_ > readBufferSize_) {
60 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +000061 readBufferSize_ *= 2;
62 }
63 readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_);
64 if (readBuffer_ == NULL) {
65 perror("TConnection::workSocket() realloc");
66 close();
67 return;
68 }
69 }
70
71 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +000072 fetch = readWant_ - readBufferPos_;
73 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +000074
75 if (got > 0) {
76 // Move along in the buffer
77 readBufferPos_ += got;
78
79 // Check that we did not overdo it
80 assert(readBufferPos_ <= readWant_);
81
82 // We are done reading, move onto the next state
83 if (readBufferPos_ == readWant_) {
84 transition();
85 }
86 return;
87 } else if (got == -1) {
88 // Blocking errors are okay, just move on
89 if (errno == EAGAIN || errno == EWOULDBLOCK) {
90 return;
91 }
92
93 if (errno != ECONNRESET) {
94 perror("TConnection::workSocket() recv -1");
95 }
96 }
97
98 // Whenever we get down here it means a remote disconnect
99 close();
100
101 return;
102
103 case SOCKET_SEND:
104 // Should never have position past size
105 assert(writeBufferPos_ <= writeBufferSize_);
106
107 // If there is no data to send, then let us move on
108 if (writeBufferPos_ == writeBufferSize_) {
109 fprintf(stderr, "WARNING: Send state with no data to send\n");
110 transition();
111 return;
112 }
113
114 flags = 0;
115 #ifdef MSG_NOSIGNAL
116 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
117 // check for the EPIPE return condition and close the socket in that case
118 flags |= MSG_NOSIGNAL;
119 #endif // ifdef MSG_NOSIGNAL
120
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000121 left = writeBufferSize_ - writeBufferPos_;
122 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000123
124 if (sent <= 0) {
125 // Blocking errors are okay, just move on
126 if (errno == EAGAIN || errno == EWOULDBLOCK) {
127 return;
128 }
129 if (errno != EPIPE) {
130 perror("TConnection::workSocket() send -1");
131 }
132 close();
133 return;
134 }
135
136 writeBufferPos_ += sent;
137
138 // Did we overdo it?
139 assert(writeBufferPos_ <= writeBufferSize_);
140
141 // We are done!
142 if (writeBufferPos_ == writeBufferSize_) {
143 transition();
144 }
145
146 return;
147
148 default:
149 fprintf(stderr, "Shit Got Ill. Socket State %d\n", socketState_);
150 assert(0);
151 }
152}
153
154/**
155 * This is called when the application transitions from one state into
156 * another. This means that it has finished writing the data that it needed
157 * to, or finished receiving the data that it needed to.
158 */
159void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000160
161 int sz = 0;
162
Mark Slee2f6404d2006-10-10 01:37:40 +0000163 // Switch upon the state that we are currently in and move to a new state
164 switch (appState_) {
165
166 case APP_READ_REQUEST:
167 // We are done reading the request, package the read buffer into transport
168 // and get back some data from the dispatch function
169 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
170 outputTransport_->resetBuffer();
171
172 try {
173 // Invoke the processor
Mark Slee4af6ed72006-10-25 19:02:49 +0000174 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000175 } catch (TTransportException &ttx) {
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000176 fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000177 close();
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000178 return;
179 } catch (TException &x) {
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000180 fprintf(stderr, "TException: Server::process() %s\n", x.what());
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000181 close();
182 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000183 } catch (...) {
184 fprintf(stderr, "Server::process() unknown exception\n");
185 close();
186 return;
187 }
188
Mark Slee2f6404d2006-10-10 01:37:40 +0000189 // Get the result of the operation
190 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
191
192 // If the function call generated return data, then move into the send
193 // state and get going
194 if (writeBufferSize_ > 0) {
195
196 // Move into write state
197 writeBufferPos_ = 0;
198 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000199
200 if (server_->getFrameResponses()) {
201 // Put the frame size into the write buffer
202 appState_ = APP_SEND_FRAME_SIZE;
203 frameSize_ = (int32_t)htonl(writeBufferSize_);
204 writeBuffer_ = (uint8_t*)&frameSize_;
205 writeBufferSize_ = 4;
206 } else {
207 // Go straight into sending the result, do not frame it
208 appState_ = APP_SEND_RESULT;
209 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000210
211 // Socket into write mode
212 setWrite();
213
214 // Try to work the socket immediately
215 workSocket();
216
217 return;
218 }
219
220 // In this case, the request was asynchronous and we should fall through
221 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000222 goto LABEL_APP_INIT;
223
224 case APP_SEND_FRAME_SIZE:
225
226 // Refetch the result of the operation since we put the frame size into
227 // writeBuffer_
228 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
229 writeBufferPos_ = 0;
230
231 // Now in send result state
232 appState_ = APP_SEND_RESULT;
233
234 // Go to work on the socket right away, probably still writeable
235 workSocket();
236
237 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000238
239 case APP_SEND_RESULT:
240
241 // N.B.: We also intentionally fall through here into the INIT state!
242
Mark Slee92f00fb2006-10-25 01:28:17 +0000243 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000244 case APP_INIT:
245
246 // Clear write buffer variables
247 writeBuffer_ = NULL;
248 writeBufferPos_ = 0;
249 writeBufferSize_ = 0;
250
251 // Set up read buffer for getting 4 bytes
252 readBufferPos_ = 0;
253 readWant_ = 4;
254
255 // Into read4 state we go
256 socketState_ = SOCKET_RECV;
257 appState_ = APP_READ_FRAME_SIZE;
258
259 // Register read event
260 setRead();
261
262 // Try to work the socket right away
263 workSocket();
264
265 return;
266
267 case APP_READ_FRAME_SIZE:
268 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000269 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000270 sz = (int32_t)ntohl(sz);
271
272 if (sz <= 0) {
273 fprintf(stderr, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
274 close();
275 return;
276 }
277
278 // Reset the read buffer
279 readWant_ = (uint32_t)sz;
280 readBufferPos_= 0;
281
282 // Move into read request state
283 appState_ = APP_READ_REQUEST;
284
285 // Work the socket right away
286 workSocket();
287
288 return;
289
290 default:
291 fprintf(stderr, "Totally Fucked. Application State %d\n", appState_);
292 assert(0);
293 }
294}
295
296void TConnection::setFlags(short eventFlags) {
297 // Catch the do nothing case
298 if (eventFlags_ == eventFlags) {
299 return;
300 }
301
302 // Delete a previously existing event
303 if (eventFlags_ != 0) {
304 if (event_del(&event_) == -1) {
305 perror("TConnection::setFlags event_del");
306 return;
307 }
308 }
309
310 // Update in memory structure
311 eventFlags_ = eventFlags;
312
313 /**
314 * event_set:
315 *
316 * Prepares the event structure &event to be used in future calls to
317 * event_add() and event_del(). The event will be prepared to call the
318 * event_handler using the 'sock' file descriptor to monitor events.
319 *
320 * The events can be either EV_READ, EV_WRITE, or both, indicating
321 * that an application can read or write from the file respectively without
322 * blocking.
323 *
324 * The event_handler will be called with the file descriptor that triggered
325 * the event and the type of event which will be one of: EV_TIMEOUT,
326 * EV_SIGNAL, EV_READ, EV_WRITE.
327 *
328 * The additional flag EV_PERSIST makes an event_add() persistent until
329 * event_del() has been called.
330 *
331 * Once initialized, the &event struct can be used repeatedly with
332 * event_add() and event_del() and does not need to be reinitialized unless
333 * the event_handler and/or the argument to it are to be changed. However,
334 * when an ev structure has been added to libevent using event_add() the
335 * structure must persist until the event occurs (assuming EV_PERSIST
336 * is not set) or is removed using event_del(). You may not reuse the same
337 * ev structure for multiple monitored descriptors; each descriptor needs
338 * its own ev.
339 */
340 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
341
342 // Add the event
343 if (event_add(&event_, 0) == -1) {
344 perror("TConnection::setFlags(): coult not event_add");
345 }
346}
347
348/**
349 * Closes a connection
350 */
351void TConnection::close() {
352 // Delete the registered libevent
353 if (event_del(&event_) == -1) {
354 perror("TConnection::close() event_del");
355 }
356
357 // Close the socket
358 if (socket_ > 0) {
359 ::close(socket_);
360 }
361 socket_ = 0;
362
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000363 // close any factory produced transports
364 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000365 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000366
Mark Slee2f6404d2006-10-10 01:37:40 +0000367 // Give this object back to the server that owns it
368 server_->returnConnection(this);
369}
370
371/**
372 * Creates a new connection either by reusing an object off the stack or
373 * by allocating a new one entirely
374 */
375TConnection* TNonblockingServer::createConnection(int socket, short flags) {
376 // Check the stack
377 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000378 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000379 } else {
380 TConnection* result = connectionStack_.top();
381 connectionStack_.pop();
382 result->init(socket, flags, this);
383 return result;
384 }
385}
386
387/**
388 * Returns a connection to the stack
389 */
390void TNonblockingServer::returnConnection(TConnection* connection) {
391 connectionStack_.push(connection);
392}
393
394/**
395 * Server socket had something happen
396 */
397void TNonblockingServer::handleEvent(int fd, short which) {
398 // Make sure that libevent didn't fuck up the socket handles
399 assert(fd == serverSocket_);
400
401 // Server socket accepted a new connection
402 socklen_t addrLen;
403 struct sockaddr addr;
404 addrLen = sizeof(addr);
405
406 // Going to accept a new client socket
407 int clientSocket;
408
409 // Accept as many new clients as possible, even though libevent signaled only
410 // one, this helps us to avoid having to go back into the libevent engine so
411 // many times
412 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
413
414 // Explicitly set this socket to NONBLOCK mode
415 int flags;
416 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
417 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
418 perror("thriftServerEventHandler: set O_NONBLOCK");
419 close(clientSocket);
420 return;
421 }
422
423 // Create a new TConnection for this client socket.
424 TConnection* clientConnection =
425 createConnection(clientSocket, EV_READ | EV_PERSIST);
426
427 // Fail fast if we could not create a TConnection object
428 if (clientConnection == NULL) {
429 fprintf(stderr, "thriftServerEventHandler: failed TConnection factory");
430 close(clientSocket);
431 return;
432 }
433
434 // Put this client connection into the proper state
435 clientConnection->transition();
436 }
437
438 // Done looping accept, now we have to make sure the error is due to
439 // blocking. Any other error is a problem
440 if (errno != EAGAIN && errno != EWOULDBLOCK) {
441 perror("thriftServerEventHandler: accept()");
442 }
443}
444
445/**
446 * Main workhorse function, starts up the server listening on a port and
447 * loops over the libevent handler.
448 */
449void TNonblockingServer::serve() {
450 // Initialize libevent
451 event_init();
452
453 // Print some libevent stats
454 fprintf(stderr,
455 "libevent %s method %s\n",
456 event_get_version(),
457 event_get_method());
458
459 // Create the server socket
460 serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
461 if (serverSocket_ == -1) {
462 perror("TNonblockingServer::serve() socket() -1");
463 return;
464 }
465
466 // Set socket to nonblocking mode
467 int flags;
468 if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 ||
469 fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) {
470 perror("TNonblockingServer::serve() O_NONBLOCK");
471 ::close(serverSocket_);
472 return;
473 }
474
475 int one = 1;
476 struct linger ling = {0, 0};
477
478 // Set reuseaddr to avoid 2MSL delay on server restart
479 setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
480
481 // Keepalive to ensure full result flushing
482 setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
483
484 // Turn linger off to avoid hung sockets
485 setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
486
487 // Set TCP nodelay if available, MAC OS X Hack
488 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
489 #ifndef TCP_NOPUSH
490 setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
491 #endif
492
493 struct sockaddr_in addr;
494 addr.sin_family = AF_INET;
495 addr.sin_port = htons(port_);
496 addr.sin_addr.s_addr = INADDR_ANY;
497
498 if (bind(serverSocket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
499 perror("TNonblockingServer::serve() bind");
500 close(serverSocket_);
501 return;
502 }
503
504 if (listen(serverSocket_, LISTEN_BACKLOG) == -1) {
505 perror("TNonblockingServer::serve() listen");
506 close(serverSocket_);
507 return;
508 }
509
510 // Register the server event
511 struct event serverEvent;
512 event_set(&serverEvent,
513 serverSocket_,
514 EV_READ | EV_PERSIST,
515 TNonblockingServer::eventHandler,
516 this);
517
518 // Add the event and start up the server
519 if (event_add(&serverEvent, 0) == -1) {
520 perror("TNonblockingServer::serve(): coult not event_add");
521 return;
522 }
523
524 // Run libevent engine, never returns, invokes calls to event_handler
525 event_loop(0);
526}
527
528}}} // facebook::thrift::server