|  | /* | 
|  | * Licensed to the Apache Software Foundation (ASF) under one | 
|  | * or more contributor license agreements. See the NOTICE file | 
|  | * distributed with this work for additional information | 
|  | * regarding copyright ownership. The ASF licenses this file | 
|  | * to you under the Apache License, Version 2.0 (the | 
|  | * "License"); you may not use this file except in compliance | 
|  | * with the License. You may obtain a copy of the License at | 
|  | * | 
|  | *   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | * Unless required by applicable law or agreed to in writing, | 
|  | * software distributed under the License is distributed on an | 
|  | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | * KIND, either express or implied. See the License for the | 
|  | * specific language governing permissions and limitations | 
|  | * under the License. | 
|  | */ | 
|  |  | 
|  | #define BOOST_TEST_MODULE TServerIntegrationTest | 
|  | #include <atomic> | 
|  | #include <boost/test/unit_test.hpp> | 
|  | #include <boost/date_time/posix_time/ptime.hpp> | 
|  | #include <boost/foreach.hpp> | 
|  | #include <boost/format.hpp> | 
|  | #include <boost/thread.hpp> | 
|  | #include <thrift/server/TSimpleServer.h> | 
|  | #include <thrift/server/TThreadPoolServer.h> | 
|  | #include <thrift/server/TThreadedServer.h> | 
|  | #include <memory> | 
|  | #include <thrift/protocol/TBinaryProtocol.h> | 
|  | #include <thrift/transport/TServerSocket.h> | 
|  | #include <thrift/transport/TSocket.h> | 
|  | #include <thrift/transport/TTransport.h> | 
|  | #include "gen-cpp/ParentService.h" | 
|  | #include <string> | 
|  | #include <vector> | 
|  |  | 
|  | using apache::thrift::concurrency::Guard; | 
|  | using apache::thrift::concurrency::Monitor; | 
|  | using apache::thrift::concurrency::Mutex; | 
|  | using apache::thrift::concurrency::Synchronized; | 
|  | using apache::thrift::protocol::TBinaryProtocol; | 
|  | using apache::thrift::protocol::TBinaryProtocolFactory; | 
|  | using apache::thrift::protocol::TProtocol; | 
|  | using apache::thrift::protocol::TProtocolFactory; | 
|  | using apache::thrift::transport::TServerSocket; | 
|  | using apache::thrift::transport::TServerTransport; | 
|  | using apache::thrift::transport::TSocket; | 
|  | using apache::thrift::transport::TTransport; | 
|  | using apache::thrift::transport::TTransportException; | 
|  | using apache::thrift::transport::TTransportFactory; | 
|  | using apache::thrift::server::TServer; | 
|  | using apache::thrift::server::TServerEventHandler; | 
|  | using apache::thrift::server::TSimpleServer; | 
|  | using apache::thrift::server::TThreadPoolServer; | 
|  | using apache::thrift::server::TThreadedServer; | 
|  | using std::dynamic_pointer_cast; | 
|  | using std::make_shared; | 
|  | using std::shared_ptr; | 
|  | using apache::thrift::test::ParentServiceClient; | 
|  | using apache::thrift::test::ParentServiceIf; | 
|  | using apache::thrift::test::ParentServiceIfFactory; | 
|  | using apache::thrift::test::ParentServiceIfSingletonFactory; | 
|  | using apache::thrift::test::ParentServiceProcessor; | 
|  | using apache::thrift::test::ParentServiceProcessorFactory; | 
|  | using apache::thrift::TProcessor; | 
|  | using apache::thrift::TProcessorFactory; | 
|  | using boost::posix_time::milliseconds; | 
|  |  | 
|  | /** | 
|  | * preServe runs after listen() is successful, when we can connect | 
|  | */ | 
|  | class TServerReadyEventHandler : public TServerEventHandler, public Monitor { | 
|  | public: | 
|  | TServerReadyEventHandler() : isListening_(false), accepted_(0) {} | 
|  | ~TServerReadyEventHandler() override = default; | 
|  | void preServe() override { | 
|  | Synchronized sync(*this); | 
|  | isListening_ = true; | 
|  | notify(); | 
|  | } | 
|  | void* createContext(shared_ptr<TProtocol> input, | 
|  | shared_ptr<TProtocol> output) override { | 
|  | Synchronized sync(*this); | 
|  | ++accepted_; | 
|  | notify(); | 
|  |  | 
|  | (void)input; | 
|  | (void)output; | 
|  | return nullptr; | 
|  | } | 
|  | bool isListening() const { return isListening_; } | 
|  | uint64_t acceptedCount() const { return accepted_; } | 
|  |  | 
|  | private: | 
|  | bool isListening_; | 
|  | uint64_t accepted_; | 
|  | }; | 
|  |  | 
|  | /** | 
|  | * Reusing another generated test, just something to serve up | 
|  | */ | 
|  | class ParentHandler : public ParentServiceIf { | 
|  | public: | 
|  | ParentHandler() : generation_(0) {} | 
|  |  | 
|  | int32_t incrementGeneration() override { | 
|  | Guard g(mutex_); | 
|  | return ++generation_; | 
|  | } | 
|  |  | 
|  | int32_t getGeneration() override { | 
|  | Guard g(mutex_); | 
|  | return generation_; | 
|  | } | 
|  |  | 
|  | void addString(const std::string& s) override { | 
|  | Guard g(mutex_); | 
|  | strings_.push_back(s); | 
|  | } | 
|  |  | 
|  | void getStrings(std::vector<std::string>& _return) override { | 
|  | Guard g(mutex_); | 
|  | _return = strings_; | 
|  | } | 
|  |  | 
|  | void getDataWait(std::string& _return, const int32_t length) override { | 
|  | THRIFT_UNUSED_VARIABLE(_return); | 
|  | THRIFT_UNUSED_VARIABLE(length); | 
|  | } | 
|  |  | 
|  | void onewayWait() override {} | 
|  |  | 
|  | void exceptionWait(const std::string& message) override { THRIFT_UNUSED_VARIABLE(message); } | 
|  |  | 
|  | void unexpectedExceptionWait(const std::string& message) override { THRIFT_UNUSED_VARIABLE(message); } | 
|  |  | 
|  | protected: | 
|  | Mutex mutex_; | 
|  | int32_t generation_; | 
|  | std::vector<std::string> strings_; | 
|  | }; | 
|  |  | 
|  | void autoSocketCloser(TSocket* pSock) { | 
|  | pSock->close(); | 
|  | delete pSock; | 
|  | } | 
|  |  | 
|  | template <class TServerType> | 
|  | class TServerIntegrationTestFixture { | 
|  | public: | 
|  | TServerIntegrationTestFixture(const shared_ptr<TProcessorFactory>& _processorFactory) | 
|  | : pServer(new TServerType(_processorFactory, | 
|  | shared_ptr<TServerTransport>( | 
|  | new TServerSocket("localhost", 0)), | 
|  | shared_ptr<TTransportFactory>(new TTransportFactory), | 
|  | shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))), | 
|  | pEventHandler(shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)), | 
|  | bStressDone(false), | 
|  | bStressConnectionCount(0), | 
|  | bStressRequestCount(0) { | 
|  | pServer->setServerEventHandler(pEventHandler); | 
|  | } | 
|  |  | 
|  | TServerIntegrationTestFixture(const shared_ptr<TProcessor>& _processor) | 
|  | : pServer( | 
|  | new TServerType(_processor, | 
|  | shared_ptr<TServerTransport>(new TServerSocket("localhost", 0)), | 
|  | shared_ptr<TTransportFactory>(new TTransportFactory), | 
|  | shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))), | 
|  | pEventHandler(shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)), | 
|  | bStressDone(false), | 
|  | bStressConnectionCount(0), | 
|  | bStressRequestCount(0) { | 
|  | pServer->setServerEventHandler(pEventHandler); | 
|  | } | 
|  |  | 
|  | void startServer() { | 
|  | pServerThread.reset(new boost::thread(std::bind(&TServerType::serve, pServer.get()))); | 
|  |  | 
|  | // block until listen() completes so clients will be able to connect | 
|  | Synchronized sync(*(pEventHandler.get())); | 
|  | while (!pEventHandler->isListening()) { | 
|  | pEventHandler->wait(); | 
|  | } | 
|  |  | 
|  | BOOST_TEST_MESSAGE("  server is listening"); | 
|  | } | 
|  |  | 
|  | void blockUntilAccepted(uint64_t numAccepted) { | 
|  | Synchronized sync(*(pEventHandler.get())); | 
|  | while (pEventHandler->acceptedCount() < numAccepted) { | 
|  | pEventHandler->wait(); | 
|  | } | 
|  |  | 
|  | BOOST_TEST_MESSAGE(boost::format("  server has accepted %1%") % numAccepted); | 
|  | } | 
|  |  | 
|  | void stopServer() { | 
|  | if (pServerThread) { | 
|  | pServer->stop(); | 
|  | BOOST_TEST_MESSAGE("  server stop completed"); | 
|  |  | 
|  | pServerThread->join(); | 
|  | BOOST_TEST_MESSAGE("  server thread joined"); | 
|  | pServerThread.reset(); | 
|  | } | 
|  | } | 
|  |  | 
|  | ~TServerIntegrationTestFixture() { stopServer(); } | 
|  |  | 
|  | /** | 
|  | * Performs a baseline test where some clients are opened and issue a single operation | 
|  | * and then disconnect at different intervals. | 
|  | * \param[in]  numToMake  the number of concurrent clients | 
|  | * \param[in]  expectedHWM  the high water mark we expect of concurrency | 
|  | * \param[in]  purpose  a description of the test for logging purposes | 
|  | */ | 
|  | void baseline(int64_t numToMake, int64_t expectedHWM, const std::string& purpose) { | 
|  | BOOST_TEST_MESSAGE(boost::format("Testing %1%: %2% with %3% clients, expect %4% HWM") | 
|  | % typeid(TServerType).name() % purpose % numToMake % expectedHWM); | 
|  |  | 
|  | startServer(); | 
|  |  | 
|  | std::vector<shared_ptr<TSocket> > holdSockets; | 
|  | std::vector<shared_ptr<boost::thread> > holdThreads; | 
|  |  | 
|  | for (int64_t i = 0; i < numToMake; ++i) { | 
|  | shared_ptr<TSocket> pClientSock(new TSocket("localhost", getServerPort()), | 
|  | autoSocketCloser); | 
|  | holdSockets.push_back(pClientSock); | 
|  | shared_ptr<TProtocol> pClientProtocol(new TBinaryProtocol(pClientSock)); | 
|  | ParentServiceClient client(pClientProtocol); | 
|  | pClientSock->open(); | 
|  | client.incrementGeneration(); | 
|  | holdThreads.push_back(shared_ptr<boost::thread>( | 
|  | new boost::thread(std::bind(&TServerIntegrationTestFixture::delayClose, | 
|  | this, | 
|  | pClientSock, | 
|  | milliseconds(10 * numToMake))))); | 
|  | } | 
|  |  | 
|  | BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM()); | 
|  |  | 
|  | BOOST_FOREACH (shared_ptr<boost::thread> pThread, holdThreads) { pThread->join(); } | 
|  | holdThreads.clear(); | 
|  | holdSockets.clear(); | 
|  |  | 
|  | stopServer(); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Helper method used to close a connection after a delay. | 
|  | * \param[in]  toClose  the connection to close | 
|  | * \param[in]  after  the delay to impose | 
|  | */ | 
|  | void delayClose(shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) { | 
|  | boost::this_thread::sleep(after); | 
|  | toClose->close(); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * \returns  the server port number | 
|  | */ | 
|  | int getServerPort() { | 
|  | auto* pSock = dynamic_cast<TServerSocket*>(pServer->getServerTransport().get()); | 
|  | if (!pSock) { throw std::logic_error("how come?"); } | 
|  | return pSock->getPort(); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Performs a stress test by spawning threads that connect, do a number of operations | 
|  | * and disconnect, then a random delay, then do it over again.  This is done for a fixed | 
|  | * period of time to test for concurrency correctness. | 
|  | * \param[in]  numToMake  the number of concurrent clients | 
|  | */ | 
|  | void stress(int64_t numToMake, const boost::posix_time::time_duration& duration) { | 
|  | BOOST_TEST_MESSAGE(boost::format("Stress testing %1% with %2% clients for %3% seconds") | 
|  | % typeid(TServerType).name() % numToMake % duration.total_seconds()); | 
|  |  | 
|  | startServer(); | 
|  |  | 
|  | std::vector<shared_ptr<boost::thread> > holdThreads; | 
|  | for (int64_t i = 0; i < numToMake; ++i) { | 
|  | holdThreads.push_back(shared_ptr<boost::thread>( | 
|  | new boost::thread(std::bind(&TServerIntegrationTestFixture::stressor, this)))); | 
|  | } | 
|  |  | 
|  | boost::this_thread::sleep(duration); | 
|  | bStressDone = true; | 
|  |  | 
|  | BOOST_TEST_MESSAGE(boost::format("  serviced %1% connections (HWM %2%) totaling %3% requests") | 
|  | % bStressConnectionCount % pServer->getConcurrentClientCountHWM() % bStressRequestCount); | 
|  |  | 
|  | BOOST_FOREACH (shared_ptr<boost::thread> pThread, holdThreads) { pThread->join(); } | 
|  | holdThreads.clear(); | 
|  |  | 
|  | BOOST_CHECK(bStressRequestCount > 0); | 
|  |  | 
|  | stopServer(); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Helper method to stress the system | 
|  | */ | 
|  | void stressor() { | 
|  | while (!bStressDone) { | 
|  | shared_ptr<TSocket> pSocket(new TSocket("localhost", getServerPort()), autoSocketCloser); | 
|  | shared_ptr<TProtocol> pProtocol(new TBinaryProtocol(pSocket)); | 
|  | ParentServiceClient client(pProtocol); | 
|  | pSocket->open(); | 
|  | bStressConnectionCount.fetch_add(1, std::memory_order_relaxed); | 
|  | for (int i = 0; i < rand() % 1000; ++i) { | 
|  | client.incrementGeneration(); | 
|  | bStressRequestCount.fetch_add(1, std::memory_order_relaxed); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | shared_ptr<TServerType> pServer; | 
|  | shared_ptr<TServerReadyEventHandler> pEventHandler; | 
|  | shared_ptr<boost::thread> pServerThread; | 
|  | std::atomic<bool> bStressDone; | 
|  | std::atomic<int64_t> bStressConnectionCount; | 
|  | std::atomic<int64_t> bStressRequestCount; | 
|  | }; | 
|  |  | 
|  | template <class TServerType> | 
|  | class TServerIntegrationProcessorFactoryTestFixture | 
|  | : public TServerIntegrationTestFixture<TServerType> { | 
|  | public: | 
|  | TServerIntegrationProcessorFactoryTestFixture() | 
|  | : TServerIntegrationTestFixture<TServerType>(make_shared<ParentServiceProcessorFactory>( | 
|  | make_shared<ParentServiceIfSingletonFactory>( | 
|  | make_shared<ParentHandler>()))) {} | 
|  | }; | 
|  |  | 
|  | template <class TServerType> | 
|  | class TServerIntegrationProcessorTestFixture : public TServerIntegrationTestFixture<TServerType> { | 
|  | public: | 
|  | TServerIntegrationProcessorTestFixture() | 
|  | : TServerIntegrationTestFixture<TServerType>( | 
|  | make_shared<ParentServiceProcessor>(make_shared<ParentHandler>())) {} | 
|  | }; | 
|  |  | 
|  | BOOST_AUTO_TEST_SUITE(constructors) | 
|  |  | 
|  | BOOST_FIXTURE_TEST_CASE(test_simple_factory, | 
|  | TServerIntegrationProcessorFactoryTestFixture<TSimpleServer>) { | 
|  | baseline(3, 1, "factory"); | 
|  | } | 
|  |  | 
|  | BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationProcessorTestFixture<TSimpleServer>) { | 
|  | baseline(3, 1, "processor"); | 
|  | } | 
|  |  | 
|  | BOOST_FIXTURE_TEST_CASE(test_threaded_factory, | 
|  | TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>) { | 
|  | baseline(10, 10, "factory"); | 
|  | } | 
|  |  | 
|  | BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationProcessorTestFixture<TThreadedServer>) { | 
|  | baseline(10, 10, "processor"); | 
|  | } | 
|  |  | 
|  | BOOST_FIXTURE_TEST_CASE(test_threaded_bound, | 
|  | TServerIntegrationProcessorTestFixture<TThreadedServer>) { | 
|  | pServer->setConcurrentClientLimit(4); | 
|  | baseline(10, 4, "limit by server framework"); | 
|  | } | 
|  |  | 
|  | BOOST_FIXTURE_TEST_CASE(test_threaded_stress, | 
|  | TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>) { | 
|  | stress(10, boost::posix_time::seconds(3)); | 
|  | } | 
|  |  | 
|  | BOOST_FIXTURE_TEST_CASE(test_threadpool_factory, | 
|  | TServerIntegrationProcessorFactoryTestFixture<TThreadPoolServer>) { | 
|  | pServer->getThreadManager()->threadFactory( | 
|  | shared_ptr<apache::thrift::concurrency::ThreadFactory>( | 
|  | new apache::thrift::concurrency::ThreadFactory)); | 
|  | pServer->getThreadManager()->start(); | 
|  |  | 
|  | // thread factory has 4 threads as a default | 
|  | // thread factory however is a bad way to limit concurrent clients | 
|  | // as accept() will be called to grab a 5th client socket, in this case | 
|  | // and then the thread factory will block adding the thread to manage | 
|  | // that client. | 
|  | baseline(10, 5, "limit by thread manager"); | 
|  | } | 
|  |  | 
|  | BOOST_FIXTURE_TEST_CASE(test_threadpool, | 
|  | TServerIntegrationProcessorTestFixture<TThreadPoolServer>) { | 
|  | pServer->getThreadManager()->threadFactory( | 
|  | shared_ptr<apache::thrift::concurrency::ThreadFactory>( | 
|  | new apache::thrift::concurrency::ThreadFactory)); | 
|  | pServer->getThreadManager()->start(); | 
|  |  | 
|  | // thread factory has 4 threads as a default | 
|  | // thread factory however is a bad way to limit concurrent clients | 
|  | // as accept() will be called to grab a 5th client socket, in this case | 
|  | // and then the thread factory will block adding the thread to manage | 
|  | // that client. | 
|  | baseline(10, 5, "limit by thread manager"); | 
|  | } | 
|  |  | 
|  | BOOST_FIXTURE_TEST_CASE(test_threadpool_bound, | 
|  | TServerIntegrationProcessorTestFixture<TThreadPoolServer>) { | 
|  | pServer->getThreadManager()->threadFactory( | 
|  | shared_ptr<apache::thrift::concurrency::ThreadFactory>( | 
|  | new apache::thrift::concurrency::ThreadFactory)); | 
|  | pServer->getThreadManager()->start(); | 
|  | pServer->setConcurrentClientLimit(4); | 
|  |  | 
|  | baseline(10, 4, "server framework connection limit"); | 
|  | } | 
|  |  | 
|  | BOOST_FIXTURE_TEST_CASE(test_threadpool_stress, | 
|  | TServerIntegrationProcessorTestFixture<TThreadPoolServer>) { | 
|  | pServer->getThreadManager()->threadFactory( | 
|  | shared_ptr<apache::thrift::concurrency::ThreadFactory>( | 
|  | new apache::thrift::concurrency::ThreadFactory)); | 
|  | pServer->getThreadManager()->start(); | 
|  |  | 
|  | stress(10, boost::posix_time::seconds(3)); | 
|  | } | 
|  |  | 
|  | BOOST_AUTO_TEST_SUITE_END() | 
|  |  | 
|  | BOOST_FIXTURE_TEST_SUITE(TServerIntegrationTest, | 
|  | TServerIntegrationProcessorTestFixture<TThreadedServer>) | 
|  |  | 
|  | BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected) { | 
|  | // This tests THRIFT-2441 new behavior: stopping the server disconnects clients | 
|  | BOOST_TEST_MESSAGE("Testing stop with interruptable clients"); | 
|  |  | 
|  | startServer(); | 
|  |  | 
|  | shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()), | 
|  | autoSocketCloser); | 
|  | pClientSock1->open(); | 
|  |  | 
|  | shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()), | 
|  | autoSocketCloser); | 
|  | pClientSock2->open(); | 
|  |  | 
|  | // Ensure they have been accepted | 
|  | blockUntilAccepted(2); | 
|  |  | 
|  | // The test fixture destructor will force the sockets to disconnect | 
|  | // Prior to THRIFT-2441, pServer->stop() would hang until clients disconnected | 
|  | stopServer(); | 
|  |  | 
|  | // extra proof the server end disconnected the clients | 
|  | uint8_t buf[1]; | 
|  | BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected | 
|  | BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected | 
|  | } | 
|  |  | 
|  | BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected) { | 
|  | // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients | 
|  | // disconnect. | 
|  | BOOST_TEST_MESSAGE("Testing stop with uninterruptable clients"); | 
|  |  | 
|  | dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport()) | 
|  | ->setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior | 
|  |  | 
|  | startServer(); | 
|  |  | 
|  | shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()), | 
|  | autoSocketCloser); | 
|  | pClientSock1->open(); | 
|  |  | 
|  | shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()), | 
|  | autoSocketCloser); | 
|  | pClientSock2->open(); | 
|  |  | 
|  | // Ensure they have been accepted | 
|  | blockUntilAccepted(2); | 
|  |  | 
|  | boost::thread t1(std::bind(&TServerIntegrationTestFixture::delayClose, | 
|  | this, | 
|  | pClientSock1, | 
|  | milliseconds(250))); | 
|  | boost::thread t2(std::bind(&TServerIntegrationTestFixture::delayClose, | 
|  | this, | 
|  | pClientSock2, | 
|  | milliseconds(250))); | 
|  |  | 
|  | // Once the clients disconnect the server will stop | 
|  | stopServer(); | 
|  | BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0); | 
|  | t1.join(); | 
|  | t2.join(); | 
|  | } | 
|  |  | 
|  | BOOST_AUTO_TEST_CASE(test_concurrent_client_limit) { | 
|  | startServer(); | 
|  | BOOST_TEST_MESSAGE("Testing the concurrent client limit"); | 
|  |  | 
|  | BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit()); | 
|  | pServer->setConcurrentClientLimit(2); | 
|  | BOOST_CHECK_EQUAL(0, pServer->getConcurrentClientCount()); | 
|  | BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientLimit()); | 
|  |  | 
|  | shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()), | 
|  | autoSocketCloser); | 
|  | pClientSock1->open(); | 
|  | blockUntilAccepted(1); | 
|  | BOOST_CHECK_EQUAL(1, pServer->getConcurrentClientCount()); | 
|  |  | 
|  | shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()), | 
|  | autoSocketCloser); | 
|  | pClientSock2->open(); | 
|  | blockUntilAccepted(2); | 
|  | BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount()); | 
|  |  | 
|  | // a third client cannot connect until one of the other two closes | 
|  | boost::thread t2(std::bind(&TServerIntegrationTestFixture::delayClose, | 
|  | this, | 
|  | pClientSock2, | 
|  | milliseconds(250))); | 
|  | shared_ptr<TSocket> pClientSock3(new TSocket("localhost", getServerPort()), | 
|  | autoSocketCloser); | 
|  | pClientSock2->open(); | 
|  | blockUntilAccepted(2); | 
|  | BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount()); | 
|  | BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM()); | 
|  |  | 
|  | stopServer(); | 
|  | BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0); | 
|  | t2.join(); | 
|  | } | 
|  |  | 
|  | BOOST_AUTO_TEST_SUITE_END() |