Framing option for non blocking server
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664835 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cc b/lib/cpp/src/server/TNonblockingServer.cc
index 14fb5bc..bc39877 100644
--- a/lib/cpp/src/server/TNonblockingServer.cc
+++ b/lib/cpp/src/server/TNonblockingServer.cc
@@ -164,7 +164,6 @@
return;
}
-
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
@@ -175,7 +174,17 @@
// Move into write state
writeBufferPos_ = 0;
socketState_ = SOCKET_SEND;
- appState_ = APP_SEND_RESULT;
+
+ if (server_->getFrameResponses()) {
+ // Put the frame size into the write buffer
+ appState_ = APP_SEND_FRAME_SIZE;
+ frameSize_ = (int32_t)htonl(writeBufferSize_);
+ writeBuffer_ = (uint8_t*)&frameSize_;
+ writeBufferSize_ = 4;
+ } else {
+ // Go straight into sending the result, do not frame it
+ appState_ = APP_SEND_RESULT;
+ }
// Socket into write mode
setWrite();
@@ -188,11 +197,28 @@
// In this case, the request was asynchronous and we should fall through
// right back into the read frame header state
+ goto LABEL_APP_INIT;
+
+ case APP_SEND_FRAME_SIZE:
+
+ // Refetch the result of the operation since we put the frame size into
+ // writeBuffer_
+ outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
+ writeBufferPos_ = 0;
+
+ // Now in send result state
+ appState_ = APP_SEND_RESULT;
+
+ // Go to work on the socket right away, probably still writeable
+ workSocket();
+
+ return;
case APP_SEND_RESULT:
// N.B.: We also intentionally fall through here into the INIT state!
+ LABEL_APP_INIT:
case APP_INIT:
// Clear write buffer variables
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 565486c..ec024c0 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -7,8 +7,6 @@
#include <stack>
#include <event.h>
-#
-
namespace facebook { namespace thrift { namespace server {
using boost::shared_ptr;
@@ -38,6 +36,9 @@
// Port server runs on
int port_;
+ // Whether to frame responses
+ bool frameResponses_;
+
/**
* This is a stack of all the objects that have been created but that
* are NOT currently in use. When we close a connection, we place it on this
@@ -52,10 +53,21 @@
TNonblockingServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerOptions> options,
int port) :
- TServer(processor, options), serverSocket_(0), port_(port) {}
+ TServer(processor, options),
+ serverSocket_(0),
+ port_(port),
+ frameResponses_(true) {}
~TNonblockingServer() {}
+ void setFrameResponses(bool frameResponses) {
+ frameResponses_ = frameResponses;
+ }
+
+ bool getFrameResponses() {
+ return frameResponses_;
+ }
+
TConnection* createConnection(int socket, short flags);
void returnConnection(TConnection* connection);
@@ -86,6 +98,7 @@
APP_INIT,
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
+ APP_SEND_FRAME_SIZE,
APP_SEND_RESULT
};
@@ -135,6 +148,9 @@
// How far through writing are we?
uint32_t writeBufferPos_;
+ // Frame size
+ int32_t frameSize_;
+
// Transport to read from
shared_ptr<TMemoryBuffer> inputTransport_;