blob: 2fe1bf7e7a8737812d5099e3cbe6f008cb505e36 [file] [log] [blame]
Mark Slee2f6404d2006-10-10 01:37:40 +00001#include "TNonblockingServer.h"
2
3#include <sys/socket.h>
4#include <netinet/in.h>
5#include <netinet/tcp.h>
6#include <fcntl.h>
7#include <errno.h>
8#include <assert.h>
9
10namespace facebook { namespace thrift { namespace server {
11
Aditya Agarwal1ea90522007-01-19 02:02:12 +000012 void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000013 socket_ = socket;
14 server_ = s;
15 appState_ = APP_INIT;
16 eventFlags_ = 0;
17
18 readBufferPos_ = 0;
19 readWant_ = 0;
20
21 writeBuffer_ = NULL;
22 writeBufferSize_ = 0;
23 writeBufferPos_ = 0;
24
25 socketState_ = SOCKET_RECV;
26 appState_ = APP_INIT;
27
28 // Set flags, which also registers the event
29 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000030
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000031 // get input/transports
32 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
33 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000034
35 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000036 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
37 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +000038}
39
40void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +000041 int flags=0, got=0, left=0, sent=0;
42 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +000043
44 switch (socketState_) {
45 case SOCKET_RECV:
46 // It is an error to be in this state if we already have all the data
47 assert(readBufferPos_ < readWant_);
48
Mark Slee2f6404d2006-10-10 01:37:40 +000049 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +000050 if (readWant_ > readBufferSize_) {
51 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +000052 readBufferSize_ *= 2;
53 }
54 readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_);
55 if (readBuffer_ == NULL) {
56 perror("TConnection::workSocket() realloc");
57 close();
58 return;
59 }
60 }
61
62 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +000063 fetch = readWant_ - readBufferPos_;
64 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +000065
66 if (got > 0) {
67 // Move along in the buffer
68 readBufferPos_ += got;
69
70 // Check that we did not overdo it
71 assert(readBufferPos_ <= readWant_);
72
73 // We are done reading, move onto the next state
74 if (readBufferPos_ == readWant_) {
75 transition();
76 }
77 return;
78 } else if (got == -1) {
79 // Blocking errors are okay, just move on
80 if (errno == EAGAIN || errno == EWOULDBLOCK) {
81 return;
82 }
83
84 if (errno != ECONNRESET) {
85 perror("TConnection::workSocket() recv -1");
86 }
87 }
88
89 // Whenever we get down here it means a remote disconnect
90 close();
91
92 return;
93
94 case SOCKET_SEND:
95 // Should never have position past size
96 assert(writeBufferPos_ <= writeBufferSize_);
97
98 // If there is no data to send, then let us move on
99 if (writeBufferPos_ == writeBufferSize_) {
100 fprintf(stderr, "WARNING: Send state with no data to send\n");
101 transition();
102 return;
103 }
104
105 flags = 0;
106 #ifdef MSG_NOSIGNAL
107 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
108 // check for the EPIPE return condition and close the socket in that case
109 flags |= MSG_NOSIGNAL;
110 #endif // ifdef MSG_NOSIGNAL
111
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000112 left = writeBufferSize_ - writeBufferPos_;
113 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000114
115 if (sent <= 0) {
116 // Blocking errors are okay, just move on
117 if (errno == EAGAIN || errno == EWOULDBLOCK) {
118 return;
119 }
120 if (errno != EPIPE) {
121 perror("TConnection::workSocket() send -1");
122 }
123 close();
124 return;
125 }
126
127 writeBufferPos_ += sent;
128
129 // Did we overdo it?
130 assert(writeBufferPos_ <= writeBufferSize_);
131
132 // We are done!
133 if (writeBufferPos_ == writeBufferSize_) {
134 transition();
135 }
136
137 return;
138
139 default:
140 fprintf(stderr, "Shit Got Ill. Socket State %d\n", socketState_);
141 assert(0);
142 }
143}
144
145/**
146 * This is called when the application transitions from one state into
147 * another. This means that it has finished writing the data that it needed
148 * to, or finished receiving the data that it needed to.
149 */
150void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000151
152 int sz = 0;
153
Mark Slee2f6404d2006-10-10 01:37:40 +0000154 // Switch upon the state that we are currently in and move to a new state
155 switch (appState_) {
156
157 case APP_READ_REQUEST:
158 // We are done reading the request, package the read buffer into transport
159 // and get back some data from the dispatch function
160 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
161 outputTransport_->resetBuffer();
162
163 try {
164 // Invoke the processor
Mark Slee4af6ed72006-10-25 19:02:49 +0000165 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000166 } catch (TTransportException &ttx) {
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000167 fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000168 close();
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000169 return;
170 } catch (TException &x) {
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000171 fprintf(stderr, "TException: Server::process() %s\n", x.what());
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000172 close();
173 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000174 } catch (...) {
175 fprintf(stderr, "Server::process() unknown exception\n");
176 close();
177 return;
178 }
179
Mark Slee2f6404d2006-10-10 01:37:40 +0000180 // Get the result of the operation
181 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
182
183 // If the function call generated return data, then move into the send
184 // state and get going
185 if (writeBufferSize_ > 0) {
186
187 // Move into write state
188 writeBufferPos_ = 0;
189 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000190
191 if (server_->getFrameResponses()) {
192 // Put the frame size into the write buffer
193 appState_ = APP_SEND_FRAME_SIZE;
194 frameSize_ = (int32_t)htonl(writeBufferSize_);
195 writeBuffer_ = (uint8_t*)&frameSize_;
196 writeBufferSize_ = 4;
197 } else {
198 // Go straight into sending the result, do not frame it
199 appState_ = APP_SEND_RESULT;
200 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000201
202 // Socket into write mode
203 setWrite();
204
205 // Try to work the socket immediately
206 workSocket();
207
208 return;
209 }
210
211 // In this case, the request was asynchronous and we should fall through
212 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000213 goto LABEL_APP_INIT;
214
215 case APP_SEND_FRAME_SIZE:
216
217 // Refetch the result of the operation since we put the frame size into
218 // writeBuffer_
219 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
220 writeBufferPos_ = 0;
221
222 // Now in send result state
223 appState_ = APP_SEND_RESULT;
224
225 // Go to work on the socket right away, probably still writeable
226 workSocket();
227
228 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000229
230 case APP_SEND_RESULT:
231
232 // N.B.: We also intentionally fall through here into the INIT state!
233
Mark Slee92f00fb2006-10-25 01:28:17 +0000234 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000235 case APP_INIT:
236
237 // Clear write buffer variables
238 writeBuffer_ = NULL;
239 writeBufferPos_ = 0;
240 writeBufferSize_ = 0;
241
242 // Set up read buffer for getting 4 bytes
243 readBufferPos_ = 0;
244 readWant_ = 4;
245
246 // Into read4 state we go
247 socketState_ = SOCKET_RECV;
248 appState_ = APP_READ_FRAME_SIZE;
249
250 // Register read event
251 setRead();
252
253 // Try to work the socket right away
254 workSocket();
255
256 return;
257
258 case APP_READ_FRAME_SIZE:
259 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000260 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000261 sz = (int32_t)ntohl(sz);
262
263 if (sz <= 0) {
264 fprintf(stderr, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
265 close();
266 return;
267 }
268
269 // Reset the read buffer
270 readWant_ = (uint32_t)sz;
271 readBufferPos_= 0;
272
273 // Move into read request state
274 appState_ = APP_READ_REQUEST;
275
276 // Work the socket right away
277 workSocket();
278
279 return;
280
281 default:
282 fprintf(stderr, "Totally Fucked. Application State %d\n", appState_);
283 assert(0);
284 }
285}
286
287void TConnection::setFlags(short eventFlags) {
288 // Catch the do nothing case
289 if (eventFlags_ == eventFlags) {
290 return;
291 }
292
293 // Delete a previously existing event
294 if (eventFlags_ != 0) {
295 if (event_del(&event_) == -1) {
296 perror("TConnection::setFlags event_del");
297 return;
298 }
299 }
300
301 // Update in memory structure
302 eventFlags_ = eventFlags;
303
304 /**
305 * event_set:
306 *
307 * Prepares the event structure &event to be used in future calls to
308 * event_add() and event_del(). The event will be prepared to call the
309 * event_handler using the 'sock' file descriptor to monitor events.
310 *
311 * The events can be either EV_READ, EV_WRITE, or both, indicating
312 * that an application can read or write from the file respectively without
313 * blocking.
314 *
315 * The event_handler will be called with the file descriptor that triggered
316 * the event and the type of event which will be one of: EV_TIMEOUT,
317 * EV_SIGNAL, EV_READ, EV_WRITE.
318 *
319 * The additional flag EV_PERSIST makes an event_add() persistent until
320 * event_del() has been called.
321 *
322 * Once initialized, the &event struct can be used repeatedly with
323 * event_add() and event_del() and does not need to be reinitialized unless
324 * the event_handler and/or the argument to it are to be changed. However,
325 * when an ev structure has been added to libevent using event_add() the
326 * structure must persist until the event occurs (assuming EV_PERSIST
327 * is not set) or is removed using event_del(). You may not reuse the same
328 * ev structure for multiple monitored descriptors; each descriptor needs
329 * its own ev.
330 */
331 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
332
333 // Add the event
334 if (event_add(&event_, 0) == -1) {
335 perror("TConnection::setFlags(): coult not event_add");
336 }
337}
338
339/**
340 * Closes a connection
341 */
342void TConnection::close() {
343 // Delete the registered libevent
344 if (event_del(&event_) == -1) {
345 perror("TConnection::close() event_del");
346 }
347
348 // Close the socket
349 if (socket_ > 0) {
350 ::close(socket_);
351 }
352 socket_ = 0;
353
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000354 // close any factory produced transports
355 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000356 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000357
Mark Slee2f6404d2006-10-10 01:37:40 +0000358 // Give this object back to the server that owns it
359 server_->returnConnection(this);
360}
361
362/**
363 * Creates a new connection either by reusing an object off the stack or
364 * by allocating a new one entirely
365 */
366TConnection* TNonblockingServer::createConnection(int socket, short flags) {
367 // Check the stack
368 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000369 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000370 } else {
371 TConnection* result = connectionStack_.top();
372 connectionStack_.pop();
373 result->init(socket, flags, this);
374 return result;
375 }
376}
377
378/**
379 * Returns a connection to the stack
380 */
381void TNonblockingServer::returnConnection(TConnection* connection) {
382 connectionStack_.push(connection);
383}
384
385/**
386 * Server socket had something happen
387 */
388void TNonblockingServer::handleEvent(int fd, short which) {
389 // Make sure that libevent didn't fuck up the socket handles
390 assert(fd == serverSocket_);
391
392 // Server socket accepted a new connection
393 socklen_t addrLen;
394 struct sockaddr addr;
395 addrLen = sizeof(addr);
396
397 // Going to accept a new client socket
398 int clientSocket;
399
400 // Accept as many new clients as possible, even though libevent signaled only
401 // one, this helps us to avoid having to go back into the libevent engine so
402 // many times
403 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
404
405 // Explicitly set this socket to NONBLOCK mode
406 int flags;
407 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
408 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
409 perror("thriftServerEventHandler: set O_NONBLOCK");
410 close(clientSocket);
411 return;
412 }
413
414 // Create a new TConnection for this client socket.
415 TConnection* clientConnection =
416 createConnection(clientSocket, EV_READ | EV_PERSIST);
417
418 // Fail fast if we could not create a TConnection object
419 if (clientConnection == NULL) {
420 fprintf(stderr, "thriftServerEventHandler: failed TConnection factory");
421 close(clientSocket);
422 return;
423 }
424
425 // Put this client connection into the proper state
426 clientConnection->transition();
427 }
428
429 // Done looping accept, now we have to make sure the error is due to
430 // blocking. Any other error is a problem
431 if (errno != EAGAIN && errno != EWOULDBLOCK) {
432 perror("thriftServerEventHandler: accept()");
433 }
434}
435
436/**
437 * Main workhorse function, starts up the server listening on a port and
438 * loops over the libevent handler.
439 */
440void TNonblockingServer::serve() {
441 // Initialize libevent
442 event_init();
443
444 // Print some libevent stats
445 fprintf(stderr,
446 "libevent %s method %s\n",
447 event_get_version(),
448 event_get_method());
449
450 // Create the server socket
451 serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
452 if (serverSocket_ == -1) {
453 perror("TNonblockingServer::serve() socket() -1");
454 return;
455 }
456
457 // Set socket to nonblocking mode
458 int flags;
459 if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 ||
460 fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) {
461 perror("TNonblockingServer::serve() O_NONBLOCK");
462 ::close(serverSocket_);
463 return;
464 }
465
466 int one = 1;
467 struct linger ling = {0, 0};
468
469 // Set reuseaddr to avoid 2MSL delay on server restart
470 setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
471
472 // Keepalive to ensure full result flushing
473 setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
474
475 // Turn linger off to avoid hung sockets
476 setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
477
478 // Set TCP nodelay if available, MAC OS X Hack
479 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
480 #ifndef TCP_NOPUSH
481 setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
482 #endif
483
484 struct sockaddr_in addr;
485 addr.sin_family = AF_INET;
486 addr.sin_port = htons(port_);
487 addr.sin_addr.s_addr = INADDR_ANY;
488
489 if (bind(serverSocket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
490 perror("TNonblockingServer::serve() bind");
491 close(serverSocket_);
492 return;
493 }
494
495 if (listen(serverSocket_, LISTEN_BACKLOG) == -1) {
496 perror("TNonblockingServer::serve() listen");
497 close(serverSocket_);
498 return;
499 }
500
501 // Register the server event
502 struct event serverEvent;
503 event_set(&serverEvent,
504 serverSocket_,
505 EV_READ | EV_PERSIST,
506 TNonblockingServer::eventHandler,
507 this);
508
509 // Add the event and start up the server
510 if (event_add(&serverEvent, 0) == -1) {
511 perror("TNonblockingServer::serve(): coult not event_add");
512 return;
513 }
514
515 // Run libevent engine, never returns, invokes calls to event_handler
516 event_loop(0);
517}
518
519}}} // facebook::thrift::server