blob: 25cd7b5ac68cac860fca313ab85362d3b88697ce [file] [log] [blame]
Mark Slee2f6404d2006-10-10 01:37:40 +00001#include "TNonblockingServer.h"
2
3#include <sys/socket.h>
4#include <netinet/in.h>
5#include <netinet/tcp.h>
6#include <fcntl.h>
7#include <errno.h>
8#include <assert.h>
9
10namespace facebook { namespace thrift { namespace server {
11
Aditya Agarwal1ea90522007-01-19 02:02:12 +000012 void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000013 socket_ = socket;
14 server_ = s;
15 appState_ = APP_INIT;
16 eventFlags_ = 0;
17
18 readBufferPos_ = 0;
19 readWant_ = 0;
20
21 writeBuffer_ = NULL;
22 writeBufferSize_ = 0;
23 writeBufferPos_ = 0;
24
25 socketState_ = SOCKET_RECV;
26 appState_ = APP_INIT;
27
28 // Set flags, which also registers the event
29 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000030
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000031 // get input/transports
32 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
33 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000034
35 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000036 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
37 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +000038}
39
40void TConnection::workSocket() {
41 int flags;
42
43 switch (socketState_) {
44 case SOCKET_RECV:
45 // It is an error to be in this state if we already have all the data
46 assert(readBufferPos_ < readWant_);
47
Mark Slee2f6404d2006-10-10 01:37:40 +000048 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +000049 if (readWant_ > readBufferSize_) {
50 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +000051 readBufferSize_ *= 2;
52 }
53 readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_);
54 if (readBuffer_ == NULL) {
55 perror("TConnection::workSocket() realloc");
56 close();
57 return;
58 }
59 }
60
61 // Read from the socket
robert79511192006-12-20 19:25:38 +000062 uint32_t fetch = readWant_ - readBufferPos_;
Mark Slee2f6404d2006-10-10 01:37:40 +000063 int got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
64
65 if (got > 0) {
66 // Move along in the buffer
67 readBufferPos_ += got;
68
69 // Check that we did not overdo it
70 assert(readBufferPos_ <= readWant_);
71
72 // We are done reading, move onto the next state
73 if (readBufferPos_ == readWant_) {
74 transition();
75 }
76 return;
77 } else if (got == -1) {
78 // Blocking errors are okay, just move on
79 if (errno == EAGAIN || errno == EWOULDBLOCK) {
80 return;
81 }
82
83 if (errno != ECONNRESET) {
84 perror("TConnection::workSocket() recv -1");
85 }
86 }
87
88 // Whenever we get down here it means a remote disconnect
89 close();
90
91 return;
92
93 case SOCKET_SEND:
94 // Should never have position past size
95 assert(writeBufferPos_ <= writeBufferSize_);
96
97 // If there is no data to send, then let us move on
98 if (writeBufferPos_ == writeBufferSize_) {
99 fprintf(stderr, "WARNING: Send state with no data to send\n");
100 transition();
101 return;
102 }
103
104 flags = 0;
105 #ifdef MSG_NOSIGNAL
106 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
107 // check for the EPIPE return condition and close the socket in that case
108 flags |= MSG_NOSIGNAL;
109 #endif // ifdef MSG_NOSIGNAL
110
111 int left = writeBufferSize_ - writeBufferPos_;
112 int sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
113
114 if (sent <= 0) {
115 // Blocking errors are okay, just move on
116 if (errno == EAGAIN || errno == EWOULDBLOCK) {
117 return;
118 }
119 if (errno != EPIPE) {
120 perror("TConnection::workSocket() send -1");
121 }
122 close();
123 return;
124 }
125
126 writeBufferPos_ += sent;
127
128 // Did we overdo it?
129 assert(writeBufferPos_ <= writeBufferSize_);
130
131 // We are done!
132 if (writeBufferPos_ == writeBufferSize_) {
133 transition();
134 }
135
136 return;
137
138 default:
139 fprintf(stderr, "Shit Got Ill. Socket State %d\n", socketState_);
140 assert(0);
141 }
142}
143
144/**
145 * This is called when the application transitions from one state into
146 * another. This means that it has finished writing the data that it needed
147 * to, or finished receiving the data that it needed to.
148 */
149void TConnection::transition() {
150 // Switch upon the state that we are currently in and move to a new state
151 switch (appState_) {
152
153 case APP_READ_REQUEST:
154 // We are done reading the request, package the read buffer into transport
155 // and get back some data from the dispatch function
156 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
157 outputTransport_->resetBuffer();
158
159 try {
160 // Invoke the processor
Mark Slee4af6ed72006-10-25 19:02:49 +0000161 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000162 } catch (TTransportException &ttx) {
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000163 fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000164 close();
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000165 return;
166 } catch (TException &x) {
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000167 fprintf(stderr, "TException: Server::process() %s\n", x.what());
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000168 close();
169 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000170 } catch (...) {
171 fprintf(stderr, "Server::process() unknown exception\n");
172 close();
173 return;
174 }
175
Mark Slee2f6404d2006-10-10 01:37:40 +0000176 // Get the result of the operation
177 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
178
179 // If the function call generated return data, then move into the send
180 // state and get going
181 if (writeBufferSize_ > 0) {
182
183 // Move into write state
184 writeBufferPos_ = 0;
185 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000186
187 if (server_->getFrameResponses()) {
188 // Put the frame size into the write buffer
189 appState_ = APP_SEND_FRAME_SIZE;
190 frameSize_ = (int32_t)htonl(writeBufferSize_);
191 writeBuffer_ = (uint8_t*)&frameSize_;
192 writeBufferSize_ = 4;
193 } else {
194 // Go straight into sending the result, do not frame it
195 appState_ = APP_SEND_RESULT;
196 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000197
198 // Socket into write mode
199 setWrite();
200
201 // Try to work the socket immediately
202 workSocket();
203
204 return;
205 }
206
207 // In this case, the request was asynchronous and we should fall through
208 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000209 goto LABEL_APP_INIT;
210
211 case APP_SEND_FRAME_SIZE:
212
213 // Refetch the result of the operation since we put the frame size into
214 // writeBuffer_
215 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
216 writeBufferPos_ = 0;
217
218 // Now in send result state
219 appState_ = APP_SEND_RESULT;
220
221 // Go to work on the socket right away, probably still writeable
222 workSocket();
223
224 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000225
226 case APP_SEND_RESULT:
227
228 // N.B.: We also intentionally fall through here into the INIT state!
229
Mark Slee92f00fb2006-10-25 01:28:17 +0000230 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000231 case APP_INIT:
232
233 // Clear write buffer variables
234 writeBuffer_ = NULL;
235 writeBufferPos_ = 0;
236 writeBufferSize_ = 0;
237
238 // Set up read buffer for getting 4 bytes
239 readBufferPos_ = 0;
240 readWant_ = 4;
241
242 // Into read4 state we go
243 socketState_ = SOCKET_RECV;
244 appState_ = APP_READ_FRAME_SIZE;
245
246 // Register read event
247 setRead();
248
249 // Try to work the socket right away
250 workSocket();
251
252 return;
253
254 case APP_READ_FRAME_SIZE:
255 // We just read the request length, deserialize it
256 int sz = *(int32_t*)readBuffer_;
257 sz = (int32_t)ntohl(sz);
258
259 if (sz <= 0) {
260 fprintf(stderr, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
261 close();
262 return;
263 }
264
265 // Reset the read buffer
266 readWant_ = (uint32_t)sz;
267 readBufferPos_= 0;
268
269 // Move into read request state
270 appState_ = APP_READ_REQUEST;
271
272 // Work the socket right away
273 workSocket();
274
275 return;
276
277 default:
278 fprintf(stderr, "Totally Fucked. Application State %d\n", appState_);
279 assert(0);
280 }
281}
282
283void TConnection::setFlags(short eventFlags) {
284 // Catch the do nothing case
285 if (eventFlags_ == eventFlags) {
286 return;
287 }
288
289 // Delete a previously existing event
290 if (eventFlags_ != 0) {
291 if (event_del(&event_) == -1) {
292 perror("TConnection::setFlags event_del");
293 return;
294 }
295 }
296
297 // Update in memory structure
298 eventFlags_ = eventFlags;
299
300 /**
301 * event_set:
302 *
303 * Prepares the event structure &event to be used in future calls to
304 * event_add() and event_del(). The event will be prepared to call the
305 * event_handler using the 'sock' file descriptor to monitor events.
306 *
307 * The events can be either EV_READ, EV_WRITE, or both, indicating
308 * that an application can read or write from the file respectively without
309 * blocking.
310 *
311 * The event_handler will be called with the file descriptor that triggered
312 * the event and the type of event which will be one of: EV_TIMEOUT,
313 * EV_SIGNAL, EV_READ, EV_WRITE.
314 *
315 * The additional flag EV_PERSIST makes an event_add() persistent until
316 * event_del() has been called.
317 *
318 * Once initialized, the &event struct can be used repeatedly with
319 * event_add() and event_del() and does not need to be reinitialized unless
320 * the event_handler and/or the argument to it are to be changed. However,
321 * when an ev structure has been added to libevent using event_add() the
322 * structure must persist until the event occurs (assuming EV_PERSIST
323 * is not set) or is removed using event_del(). You may not reuse the same
324 * ev structure for multiple monitored descriptors; each descriptor needs
325 * its own ev.
326 */
327 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
328
329 // Add the event
330 if (event_add(&event_, 0) == -1) {
331 perror("TConnection::setFlags(): coult not event_add");
332 }
333}
334
335/**
336 * Closes a connection
337 */
338void TConnection::close() {
339 // Delete the registered libevent
340 if (event_del(&event_) == -1) {
341 perror("TConnection::close() event_del");
342 }
343
344 // Close the socket
345 if (socket_ > 0) {
346 ::close(socket_);
347 }
348 socket_ = 0;
349
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000350 // close any factory produced transports
351 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000352 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000353
Mark Slee2f6404d2006-10-10 01:37:40 +0000354 // Give this object back to the server that owns it
355 server_->returnConnection(this);
356}
357
358/**
359 * Creates a new connection either by reusing an object off the stack or
360 * by allocating a new one entirely
361 */
362TConnection* TNonblockingServer::createConnection(int socket, short flags) {
363 // Check the stack
364 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000365 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000366 } else {
367 TConnection* result = connectionStack_.top();
368 connectionStack_.pop();
369 result->init(socket, flags, this);
370 return result;
371 }
372}
373
374/**
375 * Returns a connection to the stack
376 */
377void TNonblockingServer::returnConnection(TConnection* connection) {
378 connectionStack_.push(connection);
379}
380
381/**
382 * Server socket had something happen
383 */
384void TNonblockingServer::handleEvent(int fd, short which) {
385 // Make sure that libevent didn't fuck up the socket handles
386 assert(fd == serverSocket_);
387
388 // Server socket accepted a new connection
389 socklen_t addrLen;
390 struct sockaddr addr;
391 addrLen = sizeof(addr);
392
393 // Going to accept a new client socket
394 int clientSocket;
395
396 // Accept as many new clients as possible, even though libevent signaled only
397 // one, this helps us to avoid having to go back into the libevent engine so
398 // many times
399 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
400
401 // Explicitly set this socket to NONBLOCK mode
402 int flags;
403 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
404 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
405 perror("thriftServerEventHandler: set O_NONBLOCK");
406 close(clientSocket);
407 return;
408 }
409
410 // Create a new TConnection for this client socket.
411 TConnection* clientConnection =
412 createConnection(clientSocket, EV_READ | EV_PERSIST);
413
414 // Fail fast if we could not create a TConnection object
415 if (clientConnection == NULL) {
416 fprintf(stderr, "thriftServerEventHandler: failed TConnection factory");
417 close(clientSocket);
418 return;
419 }
420
421 // Put this client connection into the proper state
422 clientConnection->transition();
423 }
424
425 // Done looping accept, now we have to make sure the error is due to
426 // blocking. Any other error is a problem
427 if (errno != EAGAIN && errno != EWOULDBLOCK) {
428 perror("thriftServerEventHandler: accept()");
429 }
430}
431
432/**
433 * Main workhorse function, starts up the server listening on a port and
434 * loops over the libevent handler.
435 */
436void TNonblockingServer::serve() {
437 // Initialize libevent
438 event_init();
439
440 // Print some libevent stats
441 fprintf(stderr,
442 "libevent %s method %s\n",
443 event_get_version(),
444 event_get_method());
445
446 // Create the server socket
447 serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
448 if (serverSocket_ == -1) {
449 perror("TNonblockingServer::serve() socket() -1");
450 return;
451 }
452
453 // Set socket to nonblocking mode
454 int flags;
455 if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 ||
456 fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) {
457 perror("TNonblockingServer::serve() O_NONBLOCK");
458 ::close(serverSocket_);
459 return;
460 }
461
462 int one = 1;
463 struct linger ling = {0, 0};
464
465 // Set reuseaddr to avoid 2MSL delay on server restart
466 setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
467
468 // Keepalive to ensure full result flushing
469 setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
470
471 // Turn linger off to avoid hung sockets
472 setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
473
474 // Set TCP nodelay if available, MAC OS X Hack
475 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
476 #ifndef TCP_NOPUSH
477 setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
478 #endif
479
480 struct sockaddr_in addr;
481 addr.sin_family = AF_INET;
482 addr.sin_port = htons(port_);
483 addr.sin_addr.s_addr = INADDR_ANY;
484
485 if (bind(serverSocket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
486 perror("TNonblockingServer::serve() bind");
487 close(serverSocket_);
488 return;
489 }
490
491 if (listen(serverSocket_, LISTEN_BACKLOG) == -1) {
492 perror("TNonblockingServer::serve() listen");
493 close(serverSocket_);
494 return;
495 }
496
497 // Register the server event
498 struct event serverEvent;
499 event_set(&serverEvent,
500 serverSocket_,
501 EV_READ | EV_PERSIST,
502 TNonblockingServer::eventHandler,
503 this);
504
505 // Add the event and start up the server
506 if (event_add(&serverEvent, 0) == -1) {
507 perror("TNonblockingServer::serve(): coult not event_add");
508 return;
509 }
510
511 // Run libevent engine, never returns, invokes calls to event_handler
512 event_loop(0);
513}
514
515}}} // facebook::thrift::server