blob: a7c8393dfc4de4c62fedb42f21e48f22c1f7ecf0 [file] [log] [blame]
Mark Slee2f6404d2006-10-10 01:37:40 +00001#include "TNonblockingServer.h"
2
3#include <sys/socket.h>
4#include <netinet/in.h>
5#include <netinet/tcp.h>
6#include <fcntl.h>
7#include <errno.h>
8#include <assert.h>
9
10namespace facebook { namespace thrift { namespace server {
11
12void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
13 socket_ = socket;
14 server_ = s;
15 appState_ = APP_INIT;
16 eventFlags_ = 0;
17
18 readBufferPos_ = 0;
19 readWant_ = 0;
20
21 writeBuffer_ = NULL;
22 writeBufferSize_ = 0;
23 writeBufferPos_ = 0;
24
25 socketState_ = SOCKET_RECV;
26 appState_ = APP_INIT;
27
28 // Set flags, which also registers the event
29 setFlags(eventFlags);
30}
31
32void TConnection::workSocket() {
33 int flags;
34
35 switch (socketState_) {
36 case SOCKET_RECV:
37 // It is an error to be in this state if we already have all the data
38 assert(readBufferPos_ < readWant_);
39
40 // How much space is availble, and how much will we fetch
41 uint32_t avail = readBufferSize_ - readBufferPos_;
42 uint32_t fetch = readWant_ - readBufferPos_;
43
44 // Double the buffer size until it is big enough
45 if (fetch > avail) {
46 while (fetch > avail) {
47 readBufferSize_ *= 2;
48 }
49 readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_);
50 if (readBuffer_ == NULL) {
51 perror("TConnection::workSocket() realloc");
52 close();
53 return;
54 }
55 }
56
57 // Read from the socket
58 int got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
59
60 if (got > 0) {
61 // Move along in the buffer
62 readBufferPos_ += got;
63
64 // Check that we did not overdo it
65 assert(readBufferPos_ <= readWant_);
66
67 // We are done reading, move onto the next state
68 if (readBufferPos_ == readWant_) {
69 transition();
70 }
71 return;
72 } else if (got == -1) {
73 // Blocking errors are okay, just move on
74 if (errno == EAGAIN || errno == EWOULDBLOCK) {
75 return;
76 }
77
78 if (errno != ECONNRESET) {
79 perror("TConnection::workSocket() recv -1");
80 }
81 }
82
83 // Whenever we get down here it means a remote disconnect
84 close();
85
86 return;
87
88 case SOCKET_SEND:
89 // Should never have position past size
90 assert(writeBufferPos_ <= writeBufferSize_);
91
92 // If there is no data to send, then let us move on
93 if (writeBufferPos_ == writeBufferSize_) {
94 fprintf(stderr, "WARNING: Send state with no data to send\n");
95 transition();
96 return;
97 }
98
99 flags = 0;
100 #ifdef MSG_NOSIGNAL
101 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
102 // check for the EPIPE return condition and close the socket in that case
103 flags |= MSG_NOSIGNAL;
104 #endif // ifdef MSG_NOSIGNAL
105
106 int left = writeBufferSize_ - writeBufferPos_;
107 int sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
108
109 if (sent <= 0) {
110 // Blocking errors are okay, just move on
111 if (errno == EAGAIN || errno == EWOULDBLOCK) {
112 return;
113 }
114 if (errno != EPIPE) {
115 perror("TConnection::workSocket() send -1");
116 }
117 close();
118 return;
119 }
120
121 writeBufferPos_ += sent;
122
123 // Did we overdo it?
124 assert(writeBufferPos_ <= writeBufferSize_);
125
126 // We are done!
127 if (writeBufferPos_ == writeBufferSize_) {
128 transition();
129 }
130
131 return;
132
133 default:
134 fprintf(stderr, "Shit Got Ill. Socket State %d\n", socketState_);
135 assert(0);
136 }
137}
138
139/**
140 * This is called when the application transitions from one state into
141 * another. This means that it has finished writing the data that it needed
142 * to, or finished receiving the data that it needed to.
143 */
144void TConnection::transition() {
145 // Switch upon the state that we are currently in and move to a new state
146 switch (appState_) {
147
148 case APP_READ_REQUEST:
149 // We are done reading the request, package the read buffer into transport
150 // and get back some data from the dispatch function
151 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
152 outputTransport_->resetBuffer();
153
154 try {
155 // Invoke the processor
Mark Slee4af6ed72006-10-25 19:02:49 +0000156 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000157 } catch (TTransportException &ttx) {
158 fprintf(stderr, "Server::process() %s\n", ttx.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000159 close();
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000160 return;
161 } catch (TException &x) {
162 fprintf(stderr, "Server::process() %s\n", x.what());
163 close();
164 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000165 } catch (...) {
166 fprintf(stderr, "Server::process() unknown exception\n");
167 close();
168 return;
169 }
170
Mark Slee2f6404d2006-10-10 01:37:40 +0000171 // Get the result of the operation
172 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
173
174 // If the function call generated return data, then move into the send
175 // state and get going
176 if (writeBufferSize_ > 0) {
177
178 // Move into write state
179 writeBufferPos_ = 0;
180 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000181
182 if (server_->getFrameResponses()) {
183 // Put the frame size into the write buffer
184 appState_ = APP_SEND_FRAME_SIZE;
185 frameSize_ = (int32_t)htonl(writeBufferSize_);
186 writeBuffer_ = (uint8_t*)&frameSize_;
187 writeBufferSize_ = 4;
188 } else {
189 // Go straight into sending the result, do not frame it
190 appState_ = APP_SEND_RESULT;
191 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000192
193 // Socket into write mode
194 setWrite();
195
196 // Try to work the socket immediately
197 workSocket();
198
199 return;
200 }
201
202 // In this case, the request was asynchronous and we should fall through
203 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000204 goto LABEL_APP_INIT;
205
206 case APP_SEND_FRAME_SIZE:
207
208 // Refetch the result of the operation since we put the frame size into
209 // writeBuffer_
210 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
211 writeBufferPos_ = 0;
212
213 // Now in send result state
214 appState_ = APP_SEND_RESULT;
215
216 // Go to work on the socket right away, probably still writeable
217 workSocket();
218
219 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000220
221 case APP_SEND_RESULT:
222
223 // N.B.: We also intentionally fall through here into the INIT state!
224
Mark Slee92f00fb2006-10-25 01:28:17 +0000225 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000226 case APP_INIT:
227
228 // Clear write buffer variables
229 writeBuffer_ = NULL;
230 writeBufferPos_ = 0;
231 writeBufferSize_ = 0;
232
233 // Set up read buffer for getting 4 bytes
234 readBufferPos_ = 0;
235 readWant_ = 4;
236
237 // Into read4 state we go
238 socketState_ = SOCKET_RECV;
239 appState_ = APP_READ_FRAME_SIZE;
240
241 // Register read event
242 setRead();
243
244 // Try to work the socket right away
245 workSocket();
246
247 return;
248
249 case APP_READ_FRAME_SIZE:
250 // We just read the request length, deserialize it
251 int sz = *(int32_t*)readBuffer_;
252 sz = (int32_t)ntohl(sz);
253
254 if (sz <= 0) {
255 fprintf(stderr, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
256 close();
257 return;
258 }
259
260 // Reset the read buffer
261 readWant_ = (uint32_t)sz;
262 readBufferPos_= 0;
263
264 // Move into read request state
265 appState_ = APP_READ_REQUEST;
266
267 // Work the socket right away
268 workSocket();
269
270 return;
271
272 default:
273 fprintf(stderr, "Totally Fucked. Application State %d\n", appState_);
274 assert(0);
275 }
276}
277
278void TConnection::setFlags(short eventFlags) {
279 // Catch the do nothing case
280 if (eventFlags_ == eventFlags) {
281 return;
282 }
283
284 // Delete a previously existing event
285 if (eventFlags_ != 0) {
286 if (event_del(&event_) == -1) {
287 perror("TConnection::setFlags event_del");
288 return;
289 }
290 }
291
292 // Update in memory structure
293 eventFlags_ = eventFlags;
294
295 /**
296 * event_set:
297 *
298 * Prepares the event structure &event to be used in future calls to
299 * event_add() and event_del(). The event will be prepared to call the
300 * event_handler using the 'sock' file descriptor to monitor events.
301 *
302 * The events can be either EV_READ, EV_WRITE, or both, indicating
303 * that an application can read or write from the file respectively without
304 * blocking.
305 *
306 * The event_handler will be called with the file descriptor that triggered
307 * the event and the type of event which will be one of: EV_TIMEOUT,
308 * EV_SIGNAL, EV_READ, EV_WRITE.
309 *
310 * The additional flag EV_PERSIST makes an event_add() persistent until
311 * event_del() has been called.
312 *
313 * Once initialized, the &event struct can be used repeatedly with
314 * event_add() and event_del() and does not need to be reinitialized unless
315 * the event_handler and/or the argument to it are to be changed. However,
316 * when an ev structure has been added to libevent using event_add() the
317 * structure must persist until the event occurs (assuming EV_PERSIST
318 * is not set) or is removed using event_del(). You may not reuse the same
319 * ev structure for multiple monitored descriptors; each descriptor needs
320 * its own ev.
321 */
322 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
323
324 // Add the event
325 if (event_add(&event_, 0) == -1) {
326 perror("TConnection::setFlags(): coult not event_add");
327 }
328}
329
330/**
331 * Closes a connection
332 */
333void TConnection::close() {
334 // Delete the registered libevent
335 if (event_del(&event_) == -1) {
336 perror("TConnection::close() event_del");
337 }
338
339 // Close the socket
340 if (socket_ > 0) {
341 ::close(socket_);
342 }
343 socket_ = 0;
344
345 // Give this object back to the server that owns it
346 server_->returnConnection(this);
347}
348
349/**
350 * Creates a new connection either by reusing an object off the stack or
351 * by allocating a new one entirely
352 */
353TConnection* TNonblockingServer::createConnection(int socket, short flags) {
354 // Check the stack
355 if (connectionStack_.empty()) {
356 return new TConnection(socket, flags, this);
357 } else {
358 TConnection* result = connectionStack_.top();
359 connectionStack_.pop();
360 result->init(socket, flags, this);
361 return result;
362 }
363}
364
365/**
366 * Returns a connection to the stack
367 */
368void TNonblockingServer::returnConnection(TConnection* connection) {
369 connectionStack_.push(connection);
370}
371
372/**
373 * Server socket had something happen
374 */
375void TNonblockingServer::handleEvent(int fd, short which) {
376 // Make sure that libevent didn't fuck up the socket handles
377 assert(fd == serverSocket_);
378
379 // Server socket accepted a new connection
380 socklen_t addrLen;
381 struct sockaddr addr;
382 addrLen = sizeof(addr);
383
384 // Going to accept a new client socket
385 int clientSocket;
386
387 // Accept as many new clients as possible, even though libevent signaled only
388 // one, this helps us to avoid having to go back into the libevent engine so
389 // many times
390 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
391
392 // Explicitly set this socket to NONBLOCK mode
393 int flags;
394 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
395 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
396 perror("thriftServerEventHandler: set O_NONBLOCK");
397 close(clientSocket);
398 return;
399 }
400
401 // Create a new TConnection for this client socket.
402 TConnection* clientConnection =
403 createConnection(clientSocket, EV_READ | EV_PERSIST);
404
405 // Fail fast if we could not create a TConnection object
406 if (clientConnection == NULL) {
407 fprintf(stderr, "thriftServerEventHandler: failed TConnection factory");
408 close(clientSocket);
409 return;
410 }
411
412 // Put this client connection into the proper state
413 clientConnection->transition();
414 }
415
416 // Done looping accept, now we have to make sure the error is due to
417 // blocking. Any other error is a problem
418 if (errno != EAGAIN && errno != EWOULDBLOCK) {
419 perror("thriftServerEventHandler: accept()");
420 }
421}
422
423/**
424 * Main workhorse function, starts up the server listening on a port and
425 * loops over the libevent handler.
426 */
427void TNonblockingServer::serve() {
428 // Initialize libevent
429 event_init();
430
431 // Print some libevent stats
432 fprintf(stderr,
433 "libevent %s method %s\n",
434 event_get_version(),
435 event_get_method());
436
437 // Create the server socket
438 serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
439 if (serverSocket_ == -1) {
440 perror("TNonblockingServer::serve() socket() -1");
441 return;
442 }
443
444 // Set socket to nonblocking mode
445 int flags;
446 if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 ||
447 fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) {
448 perror("TNonblockingServer::serve() O_NONBLOCK");
449 ::close(serverSocket_);
450 return;
451 }
452
453 int one = 1;
454 struct linger ling = {0, 0};
455
456 // Set reuseaddr to avoid 2MSL delay on server restart
457 setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
458
459 // Keepalive to ensure full result flushing
460 setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
461
462 // Turn linger off to avoid hung sockets
463 setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
464
465 // Set TCP nodelay if available, MAC OS X Hack
466 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
467 #ifndef TCP_NOPUSH
468 setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
469 #endif
470
471 struct sockaddr_in addr;
472 addr.sin_family = AF_INET;
473 addr.sin_port = htons(port_);
474 addr.sin_addr.s_addr = INADDR_ANY;
475
476 if (bind(serverSocket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
477 perror("TNonblockingServer::serve() bind");
478 close(serverSocket_);
479 return;
480 }
481
482 if (listen(serverSocket_, LISTEN_BACKLOG) == -1) {
483 perror("TNonblockingServer::serve() listen");
484 close(serverSocket_);
485 return;
486 }
487
488 // Register the server event
489 struct event serverEvent;
490 event_set(&serverEvent,
491 serverSocket_,
492 EV_READ | EV_PERSIST,
493 TNonblockingServer::eventHandler,
494 this);
495
496 // Add the event and start up the server
497 if (event_add(&serverEvent, 0) == -1) {
498 perror("TNonblockingServer::serve(): coult not event_add");
499 return;
500 }
501
502 // Run libevent engine, never returns, invokes calls to event_handler
503 event_loop(0);
504}
505
506}}} // facebook::thrift::server