THRIFT-928. cpp: Thrift Server Client Stats
Add the ability for Thrift servers to monitor client connections. It is
activated by #including server/TClientInfo.h and creating 1) a
TClientInfoCallHandler passed to the processor with setEventHandler()
and 2) a TClientInforServerHandler passed to the server with
setServerEventHandler().
The result vector, showing active connections, provides client address
and the thrift call it is executing (or last executed), the time
connected, and the number of calls made since connection.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005139 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 85fe265..4245d5e 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -56,7 +56,7 @@
void run() {
try {
- while (processor_->process(input_, output_)) {
+ while (processor_->process(input_, output_, NULL)) {
if (!input_->getTransport()->peek()) {
break;
}
@@ -293,7 +293,7 @@
} else {
try {
// Invoke the processor
- server_->getProcessor()->process(inputProtocol_, outputProtocol_);
+ server_->getProcessor()->process(inputProtocol_, outputProtocol_, NULL);
} catch (TTransportException &ttx) {
GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
server_->decrementActiveProcessors();
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index 5c4c588..4dddfea 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -57,14 +57,33 @@
/**
* Called when a new client has connected and is about to being processing.
*/
- virtual void clientBegin(boost::shared_ptr<TProtocol> /* input */,
- boost::shared_ptr<TProtocol> /* output */) {}
+ virtual void* createContext(boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output) {
+ (void)input;
+ (void)output;
+ return NULL;
+ }
/**
- * Called when a client has finished making requests.
+ * Called when a client has finished request-handling to delete server
+ * context.
*/
- virtual void clientEnd(boost::shared_ptr<TProtocol> /* input */,
- boost::shared_ptr<TProtocol> /* output */) {}
+ virtual void deleteContext(void* serverContext,
+ boost::shared_ptr<TProtocol>input,
+ boost::shared_ptr<TProtocol>output) {
+ (void)serverContext;
+ (void)input;
+ (void)output;
+ }
+
+ /**
+ * Called when a client is about to call the processor.
+ */
+ virtual void processContext(void* serverContext,
+ boost::shared_ptr<TTransport> transport) {
+ (void)serverContext;
+ (void)transport;
+}
protected:
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index 394ce21..438a587 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -63,13 +63,18 @@
outputTransport = outputTransportFactory_->getTransport(client);
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+ void* connectionContext = NULL;
if (eventHandler_ != NULL) {
- eventHandler_->clientBegin(inputProtocol, outputProtocol);
+ connectionContext = eventHandler_->createContext(inputProtocol, outputProtocol);
}
try {
- while (processor_->process(inputProtocol, outputProtocol)) {
- // Peek ahead, is the remote side closed?
- if (!inputTransport->peek()) {
+ for (;;) {
+ if (eventHandler_ != NULL) {
+ eventHandler_->processContext(connectionContext, client);
+ }
+ if (!processor_->process(inputProtocol, outputProtocol, connectionContext) ||
+ // Peek ahead, is the remote side closed?
+ !inputProtocol->getTransport()->peek()) {
break;
}
}
@@ -79,7 +84,7 @@
cerr << "TSimpleServer exception: " << tx.what() << endl;
}
if (eventHandler_ != NULL) {
- eventHandler_->clientEnd(inputProtocol, outputProtocol);
+ eventHandler_->deleteContext(connectionContext, inputProtocol, outputProtocol);
}
inputTransport->close();
outputTransport->close();
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 6eea3db..18319be 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -40,11 +40,13 @@
Task(TThreadPoolServer &server,
shared_ptr<TProcessor> processor,
shared_ptr<TProtocol> input,
- shared_ptr<TProtocol> output) :
+ shared_ptr<TProtocol> output,
+ shared_ptr<TTransport> transport) :
server_(server),
processor_(processor),
input_(input),
- output_(output) {
+ output_(output),
+ transport_(transport) {
}
~Task() {}
@@ -52,12 +54,17 @@
void run() {
boost::shared_ptr<TServerEventHandler> eventHandler =
server_.getEventHandler();
+ void* connectionContext = NULL;
if (eventHandler != NULL) {
- eventHandler->clientBegin(input_, output_);
+ connectionContext = eventHandler->createContext(input_, output_);
}
try {
- while (processor_->process(input_, output_)) {
- if (!input_->getTransport()->peek()) {
+ for (;;) {
+ if (eventHandler != NULL) {
+ eventHandler->processContext(connectionContext, transport_);
+ }
+ if (!processor_->process(input_, output_, connectionContext) ||
+ !input_->getTransport()->peek()) {
break;
}
}
@@ -78,7 +85,7 @@
}
if (eventHandler != NULL) {
- eventHandler->clientEnd(input_, output_);
+ eventHandler->deleteContext(connectionContext, input_, output_);
}
try {
@@ -101,7 +108,7 @@
shared_ptr<TProcessor> processor_;
shared_ptr<TProtocol> input_;
shared_ptr<TProtocol> output_;
-
+ shared_ptr<TTransport> transport_;
};
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
@@ -167,7 +174,7 @@
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
// Add to threadmanager pool
- threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol)), timeout_);
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol, client)), timeout_);
} catch (TTransportException& ttx) {
if (inputTransport != NULL) { inputTransport->close(); }
diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp
index cc30f8f..11718ca 100644
--- a/lib/cpp/src/server/TThreadedServer.cpp
+++ b/lib/cpp/src/server/TThreadedServer.cpp
@@ -42,11 +42,13 @@
Task(TThreadedServer& server,
shared_ptr<TProcessor> processor,
shared_ptr<TProtocol> input,
- shared_ptr<TProtocol> output) :
+ shared_ptr<TProtocol> output,
+ shared_ptr<TTransport> transport) :
server_(server),
processor_(processor),
input_(input),
- output_(output) {
+ output_(output),
+ transport_(transport) {
}
~Task() {}
@@ -54,12 +56,17 @@
void run() {
boost::shared_ptr<TServerEventHandler> eventHandler =
server_.getEventHandler();
+ void* connectionContext = NULL;
if (eventHandler != NULL) {
- eventHandler->clientBegin(input_, output_);
+ connectionContext = eventHandler->createContext(input_, output_);
}
try {
- while (processor_->process(input_, output_)) {
- if (!input_->getTransport()->peek()) {
+ for (;;) {
+ if (eventHandler != NULL) {
+ eventHandler->processContext(connectionContext, transport_);
+ }
+ if (!processor_->process(input_, output_, connectionContext) ||
+ !input_->getTransport()->peek()) {
break;
}
}
@@ -73,7 +80,7 @@
GlobalOutput("TThreadedServer uncaught exception.");
}
if (eventHandler != NULL) {
- eventHandler->clientEnd(input_, output_);
+ eventHandler->deleteContext(connectionContext, input_, output_);
}
try {
@@ -107,6 +114,7 @@
shared_ptr<TProcessor> processor_;
shared_ptr<TProtocol> input_;
shared_ptr<TProtocol> output_;
+ shared_ptr<TTransport> transport_;
};
@@ -173,7 +181,8 @@
TThreadedServer::Task* task = new TThreadedServer::Task(*this,
processor_,
inputProtocol,
- outputProtocol);
+ outputProtocol,
+ client);
// Create a task
shared_ptr<Runnable> runnable =