blob: 047dd331dd3eb8adc265fdbdc3219ed29a84fd4a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <concurrency/ThreadManager.h>
#include <concurrency/PosixThreadFactory.h>
#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <server/TThreadedServer.h>
#include <server/TThreadPoolServer.h>
#include <server/TNonblockingServer.h>
#include <transport/TServerSocket.h>
#include <transport/TTransportUtils.h>
#include "ThriftTest.h"
#include <iostream>
#include <stdexcept>
#include <sstream>
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <signal.h>
using namespace std;
using namespace boost;
using namespace apache::thrift;
using namespace apache::thrift::concurrency;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::server;
using namespace thrift::test;
class TestHandler : public ThriftTestIf {
public:
TestHandler() {}
void testVoid() {
printf("testVoid()\n");
}
void testString(string& out, const string &thing) {
printf("testString(\"%s\")\n", thing.c_str());
out = thing;
}
int8_t testByte(const int8_t thing) {
printf("testByte(%d)\n", (int)thing);
return thing;
}
int32_t testI32(const int32_t thing) {
printf("testI32(%d)\n", thing);
return thing;
}
int64_t testI64(const int64_t thing) {
printf("testI64(%"PRId64")\n", thing);
return thing;
}
double testDouble(const double thing) {
printf("testDouble(%lf)\n", thing);
return thing;
}
void testStruct(Xtruct& out, const Xtruct &thing) {
printf("testStruct({\"%s\", %d, %d, %"PRId64"})\n", thing.string_thing.c_str(), (int)thing.byte_thing, thing.i32_thing, thing.i64_thing);
out = thing;
}
void testNest(Xtruct2& out, const Xtruct2& nest) {
const Xtruct &thing = nest.struct_thing;
printf("testNest({%d, {\"%s\", %d, %d, %"PRId64"}, %d})\n", (int)nest.byte_thing, thing.string_thing.c_str(), (int)thing.byte_thing, thing.i32_thing, thing.i64_thing, nest.i32_thing);
out = nest;
}
void testMap(map<int32_t, int32_t> &out, const map<int32_t, int32_t> &thing) {
printf("testMap({");
map<int32_t, int32_t>::const_iterator m_iter;
bool first = true;
for (m_iter = thing.begin(); m_iter != thing.end(); ++m_iter) {
if (first) {
first = false;
} else {
printf(", ");
}
printf("%d => %d", m_iter->first, m_iter->second);
}
printf("})\n");
out = thing;
}
void testSet(set<int32_t> &out, const set<int32_t> &thing) {
printf("testSet({");
set<int32_t>::const_iterator s_iter;
bool first = true;
for (s_iter = thing.begin(); s_iter != thing.end(); ++s_iter) {
if (first) {
first = false;
} else {
printf(", ");
}
printf("%d", *s_iter);
}
printf("})\n");
out = thing;
}
void testList(vector<int32_t> &out, const vector<int32_t> &thing) {
printf("testList({");
vector<int32_t>::const_iterator l_iter;
bool first = true;
for (l_iter = thing.begin(); l_iter != thing.end(); ++l_iter) {
if (first) {
first = false;
} else {
printf(", ");
}
printf("%d", *l_iter);
}
printf("})\n");
out = thing;
}
Numberz::type testEnum(const Numberz::type thing) {
printf("testEnum(%d)\n", thing);
return thing;
}
UserId testTypedef(const UserId thing) {
printf("testTypedef(%"PRId64")\n", thing);
return thing;
}
void testMapMap(map<int32_t, map<int32_t,int32_t> > &mapmap, const int32_t hello) {
printf("testMapMap(%d)\n", hello);
map<int32_t,int32_t> pos;
map<int32_t,int32_t> neg;
for (int i = 1; i < 5; i++) {
pos.insert(make_pair(i,i));
neg.insert(make_pair(-i,-i));
}
mapmap.insert(make_pair(4, pos));
mapmap.insert(make_pair(-4, neg));
}
void testInsanity(map<UserId, map<Numberz::type,Insanity> > &insane, const Insanity &argument) {
printf("testInsanity()\n");
Xtruct hello;
hello.string_thing = "Hello2";
hello.byte_thing = 2;
hello.i32_thing = 2;
hello.i64_thing = 2;
Xtruct goodbye;
goodbye.string_thing = "Goodbye4";
goodbye.byte_thing = 4;
goodbye.i32_thing = 4;
goodbye.i64_thing = 4;
Insanity crazy;
crazy.userMap.insert(make_pair(Numberz::EIGHT, 8));
crazy.xtructs.push_back(goodbye);
Insanity looney;
crazy.userMap.insert(make_pair(Numberz::FIVE, 5));
crazy.xtructs.push_back(hello);
map<Numberz::type, Insanity> first_map;
map<Numberz::type, Insanity> second_map;
first_map.insert(make_pair(Numberz::TWO, crazy));
first_map.insert(make_pair(Numberz::THREE, crazy));
second_map.insert(make_pair(Numberz::SIX, looney));
insane.insert(make_pair(1, first_map));
insane.insert(make_pair(2, second_map));
printf("return");
printf(" = {");
map<UserId, map<Numberz::type,Insanity> >::const_iterator i_iter;
for (i_iter = insane.begin(); i_iter != insane.end(); ++i_iter) {
printf("%"PRId64" => {", i_iter->first);
map<Numberz::type,Insanity>::const_iterator i2_iter;
for (i2_iter = i_iter->second.begin();
i2_iter != i_iter->second.end();
++i2_iter) {
printf("%d => {", i2_iter->first);
map<Numberz::type, UserId> userMap = i2_iter->second.userMap;
map<Numberz::type, UserId>::const_iterator um;
printf("{");
for (um = userMap.begin(); um != userMap.end(); ++um) {
printf("%d => %"PRId64", ", um->first, um->second);
}
printf("}, ");
vector<Xtruct> xtructs = i2_iter->second.xtructs;
vector<Xtruct>::const_iterator x;
printf("{");
for (x = xtructs.begin(); x != xtructs.end(); ++x) {
printf("{\"%s\", %d, %d, %"PRId64"}, ", x->string_thing.c_str(), (int)x->byte_thing, x->i32_thing, x->i64_thing);
}
printf("}");
printf("}, ");
}
printf("}, ");
}
printf("}\n");
}
void testMulti(Xtruct &hello, const int8_t arg0, const int32_t arg1, const int64_t arg2, const std::map<int16_t, std::string> &arg3, const Numberz::type arg4, const UserId arg5) {
printf("testMulti()\n");
hello.string_thing = "Hello2";
hello.byte_thing = arg0;
hello.i32_thing = arg1;
hello.i64_thing = (int64_t)arg2;
}
void testException(const std::string &arg)
throw(Xception, apache::thrift::TException)
{
printf("testException(%s)\n", arg.c_str());
if (arg.compare("Xception") == 0) {
Xception e;
e.errorCode = 1001;
e.message = arg;
throw e;
} else if (arg.compare("ApplicationException") == 0) {
apache::thrift::TException e;
throw e;
} else {
Xtruct result;
result.string_thing = arg;
return;
}
}
void testMultiException(Xtruct &result, const std::string &arg0, const std::string &arg1) throw(Xception, Xception2) {
printf("testMultiException(%s, %s)\n", arg0.c_str(), arg1.c_str());
if (arg0.compare("Xception") == 0) {
Xception e;
e.errorCode = 1001;
e.message = "This is an Xception";
throw e;
} else if (arg0.compare("Xception2") == 0) {
Xception2 e;
e.errorCode = 2002;
e.struct_thing.string_thing = "This is an Xception2";
throw e;
} else {
result.string_thing = arg1;
return;
}
}
void testOneway(int sleepFor) {
printf("testOneway(%d): Sleeping...\n", sleepFor);
sleep(sleepFor);
printf("testOneway(%d): done sleeping!\n", sleepFor);
}
};
class TestProcessorEventHandler : public TProcessorEventHandler {
virtual void* getContext(const char* fn_name, void* serverContext) {
return new std::string(fn_name);
}
virtual void freeContext(void* ctx, const char* fn_name) {
delete static_cast<std::string*>(ctx);
}
virtual void preRead(void* ctx, const char* fn_name) {
communicate("preRead", ctx, fn_name);
}
virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) {
communicate("postRead", ctx, fn_name);
}
virtual void preWrite(void* ctx, const char* fn_name) {
communicate("preWrite", ctx, fn_name);
}
virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) {
communicate("postWrite", ctx, fn_name);
}
virtual void asyncComplete(void* ctx, const char* fn_name) {
communicate("asyncComplete", ctx, fn_name);
}
virtual void handlerError(void* ctx, const char* fn_name) {
communicate("handlerError", ctx, fn_name);
}
void communicate(const char* event, void* ctx, const char* fn_name) {
std::cout << event << ": " << *static_cast<std::string*>(ctx) << " = " << fn_name << std::endl;
}
};
int main(int argc, char **argv) {
int port = 9090;
string serverType = "simple";
string protocolType = "binary";
size_t workerCount = 4;
bool ssl = false;
ostringstream usage;
usage <<
argv[0] << " [--port=<port number>] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--processor-events]" << endl <<
"\t\tserver-type\t\ttype of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\". Default is " << serverType << endl <<
"\t\tprotocol-type\t\ttype of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
"\t\tworkers\t\tNumber of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
map<string, string> args;
for (int ix = 1; ix < argc; ix++) {
string arg(argv[ix]);
if (arg.compare(0,2, "--") == 0) {
size_t end = arg.find_first_of("=", 2);
if (end != string::npos) {
args[string(arg, 2, end - 2)] = string(arg, end + 1);
} else {
args[string(arg, 2)] = "true";
}
} else {
throw invalid_argument("Unexcepted command line token: "+arg);
}
}
try {
if (!args["port"].empty()) {
port = atoi(args["port"].c_str());
}
if (!args["server-type"].empty()) {
serverType = args["server-type"];
if (serverType == "simple") {
} else if (serverType == "thread-pool") {
} else if (serverType == "threaded") {
} else if (serverType == "nonblocking") {
} else {
throw invalid_argument("Unknown server type "+serverType);
}
}
if (!args["protocol-type"].empty()) {
protocolType = args["protocol-type"];
if (protocolType == "binary") {
} else if (protocolType == "ascii") {
throw invalid_argument("ASCII protocol not supported");
} else if (protocolType == "xml") {
throw invalid_argument("XML protocol not supported");
} else {
throw invalid_argument("Unknown protocol type "+protocolType);
}
}
if (!args["workers"].empty()) {
workerCount = atoi(args["workers"].c_str());
}
} catch (std::exception& e) {
cerr << e.what() << endl;
cerr << usage;
}
if (args["ssl"] == "true") {
ssl = true;
signal(SIGPIPE, SIG_IGN);
}
// Dispatcher
shared_ptr<TProtocolFactory> protocolFactory(
new TBinaryProtocolFactoryT<TBufferBase>());
shared_ptr<TestHandler> testHandler(new TestHandler());
shared_ptr<TProcessor> testProcessor(
new ThriftTestProcessorT< TBinaryProtocolT<TBufferBase> >(testHandler));
if (!args["processor-events"].empty()) {
testProcessor->setEventHandler(shared_ptr<TProcessorEventHandler>(
new TestProcessorEventHandler()));
}
// Transport
shared_ptr<TSSLSocketFactory> sslSocketFactory;
shared_ptr<TServerSocket> serverSocket;
if (ssl) {
sslSocketFactory = shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory());
sslSocketFactory->loadCertificate("./server-certificate.pem");
sslSocketFactory->loadPrivateKey("./server-private-key.pem");
sslSocketFactory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
serverSocket = shared_ptr<TServerSocket>(new TSSLServerSocket(port, sslSocketFactory));
} else {
serverSocket = shared_ptr<TServerSocket>(new TServerSocket(port));
}
// Factory
shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
if (serverType == "simple") {
// Server
TSimpleServer simpleServer(testProcessor,
serverSocket,
transportFactory,
protocolFactory);
printf("Starting the server on port %d...\n", port);
simpleServer.serve();
} else if (serverType == "thread-pool") {
shared_ptr<ThreadManager> threadManager =
ThreadManager::newSimpleThreadManager(workerCount);
shared_ptr<PosixThreadFactory> threadFactory =
shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
threadManager->threadFactory(threadFactory);
threadManager->start();
TThreadPoolServer threadPoolServer(testProcessor,
serverSocket,
transportFactory,
protocolFactory,
threadManager);
printf("Starting the server on port %d...\n", port);
threadPoolServer.serve();
} else if (serverType == "threaded") {
TThreadedServer threadedServer(testProcessor,
serverSocket,
transportFactory,
protocolFactory);
printf("Starting the server on port %d...\n", port);
threadedServer.serve();
} else if (serverType == "nonblocking") {
TNonblockingServer nonblockingServer(testProcessor, port);
printf("Starting the nonblocking server on port %d...\n", port);
nonblockingServer.serve();
}
printf("done.\n");
return 0;
}