blob: 9edeb19944f230b9a44c36b591fa55e4b56bf542 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#define BOOST_TEST_MODULE TServerIntegrationTest
#include <boost/test/auto_unit_test.hpp>
#include <boost/bind.hpp>
#include <boost/format.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <thrift/server/TThreadedServer.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransport.h>
#include "gen-cpp/ParentService.h"
#include "TestPortFixture.h"
#include <vector>
using apache::thrift::concurrency::Guard;
using apache::thrift::concurrency::Monitor;
using apache::thrift::concurrency::Mutex;
using apache::thrift::concurrency::Synchronized;
using apache::thrift::protocol::TBinaryProtocol;
using apache::thrift::protocol::TBinaryProtocolFactory;
using apache::thrift::protocol::TProtocol;
using apache::thrift::protocol::TProtocolFactory;
using apache::thrift::transport::TServerSocket;
using apache::thrift::transport::TServerTransport;
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TTransport;
using apache::thrift::transport::TTransportFactory;
using apache::thrift::server::TServerEventHandler;
using apache::thrift::server::TThreadedServer;
using apache::thrift::test::ParentServiceClient;
using apache::thrift::test::ParentServiceIf;
using apache::thrift::test::ParentServiceProcessor;
/**
* preServe runs after listen() is successful, when we can connect
*/
class TServerReadyEventHandler : public TServerEventHandler, public Monitor
{
public:
TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
virtual ~TServerReadyEventHandler() {}
virtual void preServe() {
Synchronized sync(*this);
isListening_ = true;
notify();
}
virtual void* createContext(boost::shared_ptr<TProtocol> input,
boost::shared_ptr<TProtocol> output) {
Synchronized sync(*this);
++accepted_;
notify();
(void)input;
(void)output;
return NULL;
}
bool isListening() const { return isListening_; }
uint64_t acceptedCount() const { return accepted_; }
private:
bool isListening_;
uint64_t accepted_;
};
class ParentHandler : virtual public ParentServiceIf {
public:
ParentHandler() : generation_(0) {}
int32_t incrementGeneration() {
Guard g(mutex_);
return ++generation_;
}
int32_t getGeneration() {
Guard g(mutex_);
return generation_;
}
void addString(const std::string& s) {
Guard g(mutex_);
strings_.push_back(s);
}
void getStrings(std::vector<std::string>& _return) {
Guard g(mutex_);
_return = strings_;
}
void getDataWait(std::string& _return, int32_t length) {
}
void onewayWait() {
}
void exceptionWait(const std::string& message) {
}
void unexpectedExceptionWait(const std::string& message) {
}
protected:
Mutex mutex_;
int32_t generation_;
std::vector<std::string> strings_;
};
class TServerIntegrationTestFixture : public TestPortFixture
{
public:
TServerIntegrationTestFixture() :
pServer(new TThreadedServer(
boost::shared_ptr<ParentServiceProcessor>(new ParentServiceProcessor(
boost::shared_ptr<ParentServiceIf>(new ParentHandler))),
boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)),
boost::shared_ptr<TTransportFactory>(new TTransportFactory),
boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler))
{
pServer->setServerEventHandler(pEventHandler);
}
void startServer() {
pServerThread.reset(new boost::thread(boost::bind(&TThreadedServer::serve, pServer.get())));
// block until listen() completes so clients will be able to connect
Synchronized sync(*(pEventHandler.get()));
while (!pEventHandler->isListening()) {
pEventHandler->wait();
}
BOOST_MESSAGE("server is listening");
}
void blockUntilAccepted(uint64_t numAccepted) {
Synchronized sync(*(pEventHandler.get()));
while (pEventHandler->acceptedCount() < numAccepted) {
pEventHandler->wait();
}
BOOST_MESSAGE(boost::format("server has accepted %1%") % numAccepted);
}
void stopServer() {
pServer->stop();
BOOST_MESSAGE("server stop completed");
pServerThread->join();
BOOST_MESSAGE("server thread joined");
}
~TServerIntegrationTestFixture() {
stopServer();
}
void delayClose(boost::shared_ptr<TTransport> toClose) {
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
toClose->close();
}
boost::shared_ptr<TThreadedServer> pServer;
boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
boost::shared_ptr<boost::thread> pServerThread;
};
BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture )
BOOST_AUTO_TEST_CASE(test_execute_one_request_and_close)
{
// this test establishes some basic sanity
startServer();
boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
boost::shared_ptr<TProtocol> pClientProtocol1(new TBinaryProtocol(pClientSock1));
ParentServiceClient client1(pClientProtocol1);
pClientSock1->open();
client1.incrementGeneration();
pClientSock1->close();
stopServer();
}
BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
{
// This tests THRIFT-2441 new behavior: stopping the server disconnects clients
startServer();
boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
pClientSock1->open();
boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
pClientSock2->open();
// Ensure they have been accepted
blockUntilAccepted(2);
// The test fixture destructor will force the sockets to disconnect
// Prior to THRIFT-2441, pServer->stop() would hang until clients disconnected
stopServer();
// extra proof the server end disconnected the clients
uint8_t buf[1];
BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected
BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected
pClientSock1->close();
pClientSock2->close();
}
BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
{
// This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
// disconnect.
boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())->
setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
startServer();
boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
pClientSock1->open();
boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
pClientSock2->open();
// Ensure they have been accepted
blockUntilAccepted(2);
boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1));
boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2));
// Once the clients disconnect the server will stop
stopServer();
pClientSock1->close();
pClientSock2->close();
}
BOOST_AUTO_TEST_SUITE_END()