blob: e73da120982d169a6d1955fc100e11e4615b6e03 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#include "TNonblockingServer.h"
8
9#include <sys/socket.h>
10#include <netinet/in.h>
11#include <netinet/tcp.h>
12#include <fcntl.h>
13#include <errno.h>
14#include <assert.h>
15
16namespace facebook { namespace thrift { namespace server {
17
Aditya Agarwal1ea90522007-01-19 02:02:12 +000018 void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000019 socket_ = socket;
20 server_ = s;
21 appState_ = APP_INIT;
22 eventFlags_ = 0;
23
24 readBufferPos_ = 0;
25 readWant_ = 0;
26
27 writeBuffer_ = NULL;
28 writeBufferSize_ = 0;
29 writeBufferPos_ = 0;
30
31 socketState_ = SOCKET_RECV;
32 appState_ = APP_INIT;
33
34 // Set flags, which also registers the event
35 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000036
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000037 // get input/transports
38 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
39 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000040
41 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000042 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
43 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +000044}
45
46void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +000047 int flags=0, got=0, left=0, sent=0;
48 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +000049
50 switch (socketState_) {
51 case SOCKET_RECV:
52 // It is an error to be in this state if we already have all the data
53 assert(readBufferPos_ < readWant_);
54
Mark Slee2f6404d2006-10-10 01:37:40 +000055 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +000056 if (readWant_ > readBufferSize_) {
57 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +000058 readBufferSize_ *= 2;
59 }
60 readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_);
61 if (readBuffer_ == NULL) {
62 perror("TConnection::workSocket() realloc");
63 close();
64 return;
65 }
66 }
67
68 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +000069 fetch = readWant_ - readBufferPos_;
70 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +000071
72 if (got > 0) {
73 // Move along in the buffer
74 readBufferPos_ += got;
75
76 // Check that we did not overdo it
77 assert(readBufferPos_ <= readWant_);
78
79 // We are done reading, move onto the next state
80 if (readBufferPos_ == readWant_) {
81 transition();
82 }
83 return;
84 } else if (got == -1) {
85 // Blocking errors are okay, just move on
86 if (errno == EAGAIN || errno == EWOULDBLOCK) {
87 return;
88 }
89
90 if (errno != ECONNRESET) {
91 perror("TConnection::workSocket() recv -1");
92 }
93 }
94
95 // Whenever we get down here it means a remote disconnect
96 close();
97
98 return;
99
100 case SOCKET_SEND:
101 // Should never have position past size
102 assert(writeBufferPos_ <= writeBufferSize_);
103
104 // If there is no data to send, then let us move on
105 if (writeBufferPos_ == writeBufferSize_) {
106 fprintf(stderr, "WARNING: Send state with no data to send\n");
107 transition();
108 return;
109 }
110
111 flags = 0;
112 #ifdef MSG_NOSIGNAL
113 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
114 // check for the EPIPE return condition and close the socket in that case
115 flags |= MSG_NOSIGNAL;
116 #endif // ifdef MSG_NOSIGNAL
117
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000118 left = writeBufferSize_ - writeBufferPos_;
119 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000120
121 if (sent <= 0) {
122 // Blocking errors are okay, just move on
123 if (errno == EAGAIN || errno == EWOULDBLOCK) {
124 return;
125 }
126 if (errno != EPIPE) {
127 perror("TConnection::workSocket() send -1");
128 }
129 close();
130 return;
131 }
132
133 writeBufferPos_ += sent;
134
135 // Did we overdo it?
136 assert(writeBufferPos_ <= writeBufferSize_);
137
138 // We are done!
139 if (writeBufferPos_ == writeBufferSize_) {
140 transition();
141 }
142
143 return;
144
145 default:
146 fprintf(stderr, "Shit Got Ill. Socket State %d\n", socketState_);
147 assert(0);
148 }
149}
150
151/**
152 * This is called when the application transitions from one state into
153 * another. This means that it has finished writing the data that it needed
154 * to, or finished receiving the data that it needed to.
155 */
156void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000157
158 int sz = 0;
159
Mark Slee2f6404d2006-10-10 01:37:40 +0000160 // Switch upon the state that we are currently in and move to a new state
161 switch (appState_) {
162
163 case APP_READ_REQUEST:
164 // We are done reading the request, package the read buffer into transport
165 // and get back some data from the dispatch function
166 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
167 outputTransport_->resetBuffer();
168
169 try {
170 // Invoke the processor
Mark Slee4af6ed72006-10-25 19:02:49 +0000171 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000172 } catch (TTransportException &ttx) {
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000173 fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000174 close();
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000175 return;
176 } catch (TException &x) {
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000177 fprintf(stderr, "TException: Server::process() %s\n", x.what());
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000178 close();
179 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000180 } catch (...) {
181 fprintf(stderr, "Server::process() unknown exception\n");
182 close();
183 return;
184 }
185
Mark Slee2f6404d2006-10-10 01:37:40 +0000186 // Get the result of the operation
187 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
188
189 // If the function call generated return data, then move into the send
190 // state and get going
191 if (writeBufferSize_ > 0) {
192
193 // Move into write state
194 writeBufferPos_ = 0;
195 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000196
197 if (server_->getFrameResponses()) {
198 // Put the frame size into the write buffer
199 appState_ = APP_SEND_FRAME_SIZE;
200 frameSize_ = (int32_t)htonl(writeBufferSize_);
201 writeBuffer_ = (uint8_t*)&frameSize_;
202 writeBufferSize_ = 4;
203 } else {
204 // Go straight into sending the result, do not frame it
205 appState_ = APP_SEND_RESULT;
206 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000207
208 // Socket into write mode
209 setWrite();
210
211 // Try to work the socket immediately
212 workSocket();
213
214 return;
215 }
216
217 // In this case, the request was asynchronous and we should fall through
218 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000219 goto LABEL_APP_INIT;
220
221 case APP_SEND_FRAME_SIZE:
222
223 // Refetch the result of the operation since we put the frame size into
224 // writeBuffer_
225 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
226 writeBufferPos_ = 0;
227
228 // Now in send result state
229 appState_ = APP_SEND_RESULT;
230
231 // Go to work on the socket right away, probably still writeable
232 workSocket();
233
234 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000235
236 case APP_SEND_RESULT:
237
238 // N.B.: We also intentionally fall through here into the INIT state!
239
Mark Slee92f00fb2006-10-25 01:28:17 +0000240 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000241 case APP_INIT:
242
243 // Clear write buffer variables
244 writeBuffer_ = NULL;
245 writeBufferPos_ = 0;
246 writeBufferSize_ = 0;
247
248 // Set up read buffer for getting 4 bytes
249 readBufferPos_ = 0;
250 readWant_ = 4;
251
252 // Into read4 state we go
253 socketState_ = SOCKET_RECV;
254 appState_ = APP_READ_FRAME_SIZE;
255
256 // Register read event
257 setRead();
258
259 // Try to work the socket right away
260 workSocket();
261
262 return;
263
264 case APP_READ_FRAME_SIZE:
265 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000266 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000267 sz = (int32_t)ntohl(sz);
268
269 if (sz <= 0) {
270 fprintf(stderr, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
271 close();
272 return;
273 }
274
275 // Reset the read buffer
276 readWant_ = (uint32_t)sz;
277 readBufferPos_= 0;
278
279 // Move into read request state
280 appState_ = APP_READ_REQUEST;
281
282 // Work the socket right away
283 workSocket();
284
285 return;
286
287 default:
288 fprintf(stderr, "Totally Fucked. Application State %d\n", appState_);
289 assert(0);
290 }
291}
292
293void TConnection::setFlags(short eventFlags) {
294 // Catch the do nothing case
295 if (eventFlags_ == eventFlags) {
296 return;
297 }
298
299 // Delete a previously existing event
300 if (eventFlags_ != 0) {
301 if (event_del(&event_) == -1) {
302 perror("TConnection::setFlags event_del");
303 return;
304 }
305 }
306
307 // Update in memory structure
308 eventFlags_ = eventFlags;
309
310 /**
311 * event_set:
312 *
313 * Prepares the event structure &event to be used in future calls to
314 * event_add() and event_del(). The event will be prepared to call the
315 * event_handler using the 'sock' file descriptor to monitor events.
316 *
317 * The events can be either EV_READ, EV_WRITE, or both, indicating
318 * that an application can read or write from the file respectively without
319 * blocking.
320 *
321 * The event_handler will be called with the file descriptor that triggered
322 * the event and the type of event which will be one of: EV_TIMEOUT,
323 * EV_SIGNAL, EV_READ, EV_WRITE.
324 *
325 * The additional flag EV_PERSIST makes an event_add() persistent until
326 * event_del() has been called.
327 *
328 * Once initialized, the &event struct can be used repeatedly with
329 * event_add() and event_del() and does not need to be reinitialized unless
330 * the event_handler and/or the argument to it are to be changed. However,
331 * when an ev structure has been added to libevent using event_add() the
332 * structure must persist until the event occurs (assuming EV_PERSIST
333 * is not set) or is removed using event_del(). You may not reuse the same
334 * ev structure for multiple monitored descriptors; each descriptor needs
335 * its own ev.
336 */
337 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
338
339 // Add the event
340 if (event_add(&event_, 0) == -1) {
341 perror("TConnection::setFlags(): coult not event_add");
342 }
343}
344
345/**
346 * Closes a connection
347 */
348void TConnection::close() {
349 // Delete the registered libevent
350 if (event_del(&event_) == -1) {
351 perror("TConnection::close() event_del");
352 }
353
354 // Close the socket
355 if (socket_ > 0) {
356 ::close(socket_);
357 }
358 socket_ = 0;
359
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000360 // close any factory produced transports
361 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000362 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000363
Mark Slee2f6404d2006-10-10 01:37:40 +0000364 // Give this object back to the server that owns it
365 server_->returnConnection(this);
366}
367
368/**
369 * Creates a new connection either by reusing an object off the stack or
370 * by allocating a new one entirely
371 */
372TConnection* TNonblockingServer::createConnection(int socket, short flags) {
373 // Check the stack
374 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000375 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000376 } else {
377 TConnection* result = connectionStack_.top();
378 connectionStack_.pop();
379 result->init(socket, flags, this);
380 return result;
381 }
382}
383
384/**
385 * Returns a connection to the stack
386 */
387void TNonblockingServer::returnConnection(TConnection* connection) {
388 connectionStack_.push(connection);
389}
390
391/**
392 * Server socket had something happen
393 */
394void TNonblockingServer::handleEvent(int fd, short which) {
395 // Make sure that libevent didn't fuck up the socket handles
396 assert(fd == serverSocket_);
397
398 // Server socket accepted a new connection
399 socklen_t addrLen;
400 struct sockaddr addr;
401 addrLen = sizeof(addr);
402
403 // Going to accept a new client socket
404 int clientSocket;
405
406 // Accept as many new clients as possible, even though libevent signaled only
407 // one, this helps us to avoid having to go back into the libevent engine so
408 // many times
409 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
410
411 // Explicitly set this socket to NONBLOCK mode
412 int flags;
413 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
414 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
415 perror("thriftServerEventHandler: set O_NONBLOCK");
416 close(clientSocket);
417 return;
418 }
419
420 // Create a new TConnection for this client socket.
421 TConnection* clientConnection =
422 createConnection(clientSocket, EV_READ | EV_PERSIST);
423
424 // Fail fast if we could not create a TConnection object
425 if (clientConnection == NULL) {
426 fprintf(stderr, "thriftServerEventHandler: failed TConnection factory");
427 close(clientSocket);
428 return;
429 }
430
431 // Put this client connection into the proper state
432 clientConnection->transition();
433 }
434
435 // Done looping accept, now we have to make sure the error is due to
436 // blocking. Any other error is a problem
437 if (errno != EAGAIN && errno != EWOULDBLOCK) {
438 perror("thriftServerEventHandler: accept()");
439 }
440}
441
442/**
443 * Main workhorse function, starts up the server listening on a port and
444 * loops over the libevent handler.
445 */
446void TNonblockingServer::serve() {
447 // Initialize libevent
448 event_init();
449
450 // Print some libevent stats
451 fprintf(stderr,
452 "libevent %s method %s\n",
453 event_get_version(),
454 event_get_method());
455
456 // Create the server socket
457 serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
458 if (serverSocket_ == -1) {
459 perror("TNonblockingServer::serve() socket() -1");
460 return;
461 }
462
463 // Set socket to nonblocking mode
464 int flags;
465 if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 ||
466 fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) {
467 perror("TNonblockingServer::serve() O_NONBLOCK");
468 ::close(serverSocket_);
469 return;
470 }
471
472 int one = 1;
473 struct linger ling = {0, 0};
474
475 // Set reuseaddr to avoid 2MSL delay on server restart
476 setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
477
478 // Keepalive to ensure full result flushing
479 setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
480
481 // Turn linger off to avoid hung sockets
482 setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
483
484 // Set TCP nodelay if available, MAC OS X Hack
485 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
486 #ifndef TCP_NOPUSH
487 setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
488 #endif
489
490 struct sockaddr_in addr;
491 addr.sin_family = AF_INET;
492 addr.sin_port = htons(port_);
493 addr.sin_addr.s_addr = INADDR_ANY;
494
495 if (bind(serverSocket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
496 perror("TNonblockingServer::serve() bind");
497 close(serverSocket_);
498 return;
499 }
500
501 if (listen(serverSocket_, LISTEN_BACKLOG) == -1) {
502 perror("TNonblockingServer::serve() listen");
503 close(serverSocket_);
504 return;
505 }
506
507 // Register the server event
508 struct event serverEvent;
509 event_set(&serverEvent,
510 serverSocket_,
511 EV_READ | EV_PERSIST,
512 TNonblockingServer::eventHandler,
513 this);
514
515 // Add the event and start up the server
516 if (event_add(&serverEvent, 0) == -1) {
517 perror("TNonblockingServer::serve(): coult not event_add");
518 return;
519 }
520
521 // Run libevent engine, never returns, invokes calls to event_handler
522 event_loop(0);
523}
524
525}}} // facebook::thrift::server