blob: 9fb5c232dc9c8af1834a0348527fe83377c79049 [file] [log] [blame]
Mark Slee2f6404d2006-10-10 01:37:40 +00001#include "TNonblockingServer.h"
2
3#include <sys/socket.h>
4#include <netinet/in.h>
5#include <netinet/tcp.h>
6#include <fcntl.h>
7#include <errno.h>
8#include <assert.h>
9
10namespace facebook { namespace thrift { namespace server {
11
12void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
13 socket_ = socket;
14 server_ = s;
15 appState_ = APP_INIT;
16 eventFlags_ = 0;
17
18 readBufferPos_ = 0;
19 readWant_ = 0;
20
21 writeBuffer_ = NULL;
22 writeBufferSize_ = 0;
23 writeBufferPos_ = 0;
24
25 socketState_ = SOCKET_RECV;
26 appState_ = APP_INIT;
27
28 // Set flags, which also registers the event
29 setFlags(eventFlags);
30}
31
32void TConnection::workSocket() {
33 int flags;
34
35 switch (socketState_) {
36 case SOCKET_RECV:
37 // It is an error to be in this state if we already have all the data
38 assert(readBufferPos_ < readWant_);
39
Mark Slee2f6404d2006-10-10 01:37:40 +000040 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +000041 if (readWant_ > readBufferSize_) {
42 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +000043 readBufferSize_ *= 2;
44 }
45 readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_);
46 if (readBuffer_ == NULL) {
47 perror("TConnection::workSocket() realloc");
48 close();
49 return;
50 }
51 }
52
53 // Read from the socket
robert79511192006-12-20 19:25:38 +000054 uint32_t fetch = readWant_ - readBufferPos_;
Mark Slee2f6404d2006-10-10 01:37:40 +000055 int got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
56
57 if (got > 0) {
58 // Move along in the buffer
59 readBufferPos_ += got;
60
61 // Check that we did not overdo it
62 assert(readBufferPos_ <= readWant_);
63
64 // We are done reading, move onto the next state
65 if (readBufferPos_ == readWant_) {
66 transition();
67 }
68 return;
69 } else if (got == -1) {
70 // Blocking errors are okay, just move on
71 if (errno == EAGAIN || errno == EWOULDBLOCK) {
72 return;
73 }
74
75 if (errno != ECONNRESET) {
76 perror("TConnection::workSocket() recv -1");
77 }
78 }
79
80 // Whenever we get down here it means a remote disconnect
81 close();
82
83 return;
84
85 case SOCKET_SEND:
86 // Should never have position past size
87 assert(writeBufferPos_ <= writeBufferSize_);
88
89 // If there is no data to send, then let us move on
90 if (writeBufferPos_ == writeBufferSize_) {
91 fprintf(stderr, "WARNING: Send state with no data to send\n");
92 transition();
93 return;
94 }
95
96 flags = 0;
97 #ifdef MSG_NOSIGNAL
98 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
99 // check for the EPIPE return condition and close the socket in that case
100 flags |= MSG_NOSIGNAL;
101 #endif // ifdef MSG_NOSIGNAL
102
103 int left = writeBufferSize_ - writeBufferPos_;
104 int sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
105
106 if (sent <= 0) {
107 // Blocking errors are okay, just move on
108 if (errno == EAGAIN || errno == EWOULDBLOCK) {
109 return;
110 }
111 if (errno != EPIPE) {
112 perror("TConnection::workSocket() send -1");
113 }
114 close();
115 return;
116 }
117
118 writeBufferPos_ += sent;
119
120 // Did we overdo it?
121 assert(writeBufferPos_ <= writeBufferSize_);
122
123 // We are done!
124 if (writeBufferPos_ == writeBufferSize_) {
125 transition();
126 }
127
128 return;
129
130 default:
131 fprintf(stderr, "Shit Got Ill. Socket State %d\n", socketState_);
132 assert(0);
133 }
134}
135
136/**
137 * This is called when the application transitions from one state into
138 * another. This means that it has finished writing the data that it needed
139 * to, or finished receiving the data that it needed to.
140 */
141void TConnection::transition() {
142 // Switch upon the state that we are currently in and move to a new state
143 switch (appState_) {
144
145 case APP_READ_REQUEST:
146 // We are done reading the request, package the read buffer into transport
147 // and get back some data from the dispatch function
148 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
149 outputTransport_->resetBuffer();
150
151 try {
152 // Invoke the processor
Mark Slee4af6ed72006-10-25 19:02:49 +0000153 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000154 } catch (TTransportException &ttx) {
155 fprintf(stderr, "Server::process() %s\n", ttx.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000156 close();
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000157 return;
158 } catch (TException &x) {
159 fprintf(stderr, "Server::process() %s\n", x.what());
160 close();
161 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000162 } catch (...) {
163 fprintf(stderr, "Server::process() unknown exception\n");
164 close();
165 return;
166 }
167
Mark Slee2f6404d2006-10-10 01:37:40 +0000168 // Get the result of the operation
169 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
170
171 // If the function call generated return data, then move into the send
172 // state and get going
173 if (writeBufferSize_ > 0) {
174
175 // Move into write state
176 writeBufferPos_ = 0;
177 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000178
179 if (server_->getFrameResponses()) {
180 // Put the frame size into the write buffer
181 appState_ = APP_SEND_FRAME_SIZE;
182 frameSize_ = (int32_t)htonl(writeBufferSize_);
183 writeBuffer_ = (uint8_t*)&frameSize_;
184 writeBufferSize_ = 4;
185 } else {
186 // Go straight into sending the result, do not frame it
187 appState_ = APP_SEND_RESULT;
188 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000189
190 // Socket into write mode
191 setWrite();
192
193 // Try to work the socket immediately
194 workSocket();
195
196 return;
197 }
198
199 // In this case, the request was asynchronous and we should fall through
200 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000201 goto LABEL_APP_INIT;
202
203 case APP_SEND_FRAME_SIZE:
204
205 // Refetch the result of the operation since we put the frame size into
206 // writeBuffer_
207 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
208 writeBufferPos_ = 0;
209
210 // Now in send result state
211 appState_ = APP_SEND_RESULT;
212
213 // Go to work on the socket right away, probably still writeable
214 workSocket();
215
216 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000217
218 case APP_SEND_RESULT:
219
220 // N.B.: We also intentionally fall through here into the INIT state!
221
Mark Slee92f00fb2006-10-25 01:28:17 +0000222 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000223 case APP_INIT:
224
225 // Clear write buffer variables
226 writeBuffer_ = NULL;
227 writeBufferPos_ = 0;
228 writeBufferSize_ = 0;
229
230 // Set up read buffer for getting 4 bytes
231 readBufferPos_ = 0;
232 readWant_ = 4;
233
234 // Into read4 state we go
235 socketState_ = SOCKET_RECV;
236 appState_ = APP_READ_FRAME_SIZE;
237
238 // Register read event
239 setRead();
240
241 // Try to work the socket right away
242 workSocket();
243
244 return;
245
246 case APP_READ_FRAME_SIZE:
247 // We just read the request length, deserialize it
248 int sz = *(int32_t*)readBuffer_;
249 sz = (int32_t)ntohl(sz);
250
251 if (sz <= 0) {
252 fprintf(stderr, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
253 close();
254 return;
255 }
256
257 // Reset the read buffer
258 readWant_ = (uint32_t)sz;
259 readBufferPos_= 0;
260
261 // Move into read request state
262 appState_ = APP_READ_REQUEST;
263
264 // Work the socket right away
265 workSocket();
266
267 return;
268
269 default:
270 fprintf(stderr, "Totally Fucked. Application State %d\n", appState_);
271 assert(0);
272 }
273}
274
275void TConnection::setFlags(short eventFlags) {
276 // Catch the do nothing case
277 if (eventFlags_ == eventFlags) {
278 return;
279 }
280
281 // Delete a previously existing event
282 if (eventFlags_ != 0) {
283 if (event_del(&event_) == -1) {
284 perror("TConnection::setFlags event_del");
285 return;
286 }
287 }
288
289 // Update in memory structure
290 eventFlags_ = eventFlags;
291
292 /**
293 * event_set:
294 *
295 * Prepares the event structure &event to be used in future calls to
296 * event_add() and event_del(). The event will be prepared to call the
297 * event_handler using the 'sock' file descriptor to monitor events.
298 *
299 * The events can be either EV_READ, EV_WRITE, or both, indicating
300 * that an application can read or write from the file respectively without
301 * blocking.
302 *
303 * The event_handler will be called with the file descriptor that triggered
304 * the event and the type of event which will be one of: EV_TIMEOUT,
305 * EV_SIGNAL, EV_READ, EV_WRITE.
306 *
307 * The additional flag EV_PERSIST makes an event_add() persistent until
308 * event_del() has been called.
309 *
310 * Once initialized, the &event struct can be used repeatedly with
311 * event_add() and event_del() and does not need to be reinitialized unless
312 * the event_handler and/or the argument to it are to be changed. However,
313 * when an ev structure has been added to libevent using event_add() the
314 * structure must persist until the event occurs (assuming EV_PERSIST
315 * is not set) or is removed using event_del(). You may not reuse the same
316 * ev structure for multiple monitored descriptors; each descriptor needs
317 * its own ev.
318 */
319 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
320
321 // Add the event
322 if (event_add(&event_, 0) == -1) {
323 perror("TConnection::setFlags(): coult not event_add");
324 }
325}
326
327/**
328 * Closes a connection
329 */
330void TConnection::close() {
331 // Delete the registered libevent
332 if (event_del(&event_) == -1) {
333 perror("TConnection::close() event_del");
334 }
335
336 // Close the socket
337 if (socket_ > 0) {
338 ::close(socket_);
339 }
340 socket_ = 0;
341
342 // Give this object back to the server that owns it
343 server_->returnConnection(this);
344}
345
346/**
347 * Creates a new connection either by reusing an object off the stack or
348 * by allocating a new one entirely
349 */
350TConnection* TNonblockingServer::createConnection(int socket, short flags) {
351 // Check the stack
352 if (connectionStack_.empty()) {
353 return new TConnection(socket, flags, this);
354 } else {
355 TConnection* result = connectionStack_.top();
356 connectionStack_.pop();
357 result->init(socket, flags, this);
358 return result;
359 }
360}
361
362/**
363 * Returns a connection to the stack
364 */
365void TNonblockingServer::returnConnection(TConnection* connection) {
366 connectionStack_.push(connection);
367}
368
369/**
370 * Server socket had something happen
371 */
372void TNonblockingServer::handleEvent(int fd, short which) {
373 // Make sure that libevent didn't fuck up the socket handles
374 assert(fd == serverSocket_);
375
376 // Server socket accepted a new connection
377 socklen_t addrLen;
378 struct sockaddr addr;
379 addrLen = sizeof(addr);
380
381 // Going to accept a new client socket
382 int clientSocket;
383
384 // Accept as many new clients as possible, even though libevent signaled only
385 // one, this helps us to avoid having to go back into the libevent engine so
386 // many times
387 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
388
389 // Explicitly set this socket to NONBLOCK mode
390 int flags;
391 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
392 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
393 perror("thriftServerEventHandler: set O_NONBLOCK");
394 close(clientSocket);
395 return;
396 }
397
398 // Create a new TConnection for this client socket.
399 TConnection* clientConnection =
400 createConnection(clientSocket, EV_READ | EV_PERSIST);
401
402 // Fail fast if we could not create a TConnection object
403 if (clientConnection == NULL) {
404 fprintf(stderr, "thriftServerEventHandler: failed TConnection factory");
405 close(clientSocket);
406 return;
407 }
408
409 // Put this client connection into the proper state
410 clientConnection->transition();
411 }
412
413 // Done looping accept, now we have to make sure the error is due to
414 // blocking. Any other error is a problem
415 if (errno != EAGAIN && errno != EWOULDBLOCK) {
416 perror("thriftServerEventHandler: accept()");
417 }
418}
419
420/**
421 * Main workhorse function, starts up the server listening on a port and
422 * loops over the libevent handler.
423 */
424void TNonblockingServer::serve() {
425 // Initialize libevent
426 event_init();
427
428 // Print some libevent stats
429 fprintf(stderr,
430 "libevent %s method %s\n",
431 event_get_version(),
432 event_get_method());
433
434 // Create the server socket
435 serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
436 if (serverSocket_ == -1) {
437 perror("TNonblockingServer::serve() socket() -1");
438 return;
439 }
440
441 // Set socket to nonblocking mode
442 int flags;
443 if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 ||
444 fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) {
445 perror("TNonblockingServer::serve() O_NONBLOCK");
446 ::close(serverSocket_);
447 return;
448 }
449
450 int one = 1;
451 struct linger ling = {0, 0};
452
453 // Set reuseaddr to avoid 2MSL delay on server restart
454 setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
455
456 // Keepalive to ensure full result flushing
457 setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
458
459 // Turn linger off to avoid hung sockets
460 setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
461
462 // Set TCP nodelay if available, MAC OS X Hack
463 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
464 #ifndef TCP_NOPUSH
465 setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
466 #endif
467
468 struct sockaddr_in addr;
469 addr.sin_family = AF_INET;
470 addr.sin_port = htons(port_);
471 addr.sin_addr.s_addr = INADDR_ANY;
472
473 if (bind(serverSocket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
474 perror("TNonblockingServer::serve() bind");
475 close(serverSocket_);
476 return;
477 }
478
479 if (listen(serverSocket_, LISTEN_BACKLOG) == -1) {
480 perror("TNonblockingServer::serve() listen");
481 close(serverSocket_);
482 return;
483 }
484
485 // Register the server event
486 struct event serverEvent;
487 event_set(&serverEvent,
488 serverSocket_,
489 EV_READ | EV_PERSIST,
490 TNonblockingServer::eventHandler,
491 this);
492
493 // Add the event and start up the server
494 if (event_add(&serverEvent, 0) == -1) {
495 perror("TNonblockingServer::serve(): coult not event_add");
496 return;
497 }
498
499 // Run libevent engine, never returns, invokes calls to event_handler
500 event_loop(0);
501}
502
503}}} // facebook::thrift::server