THRIFT-1902 C++: Support for Multiplexing Services on any Transport, Protocol and Server
Patch: Patrik Lindblom
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index f40cdb1..05caf8c 100755
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -63,6 +63,7 @@
src/thrift/protocol/TDenseProtocol.cpp \
src/thrift/protocol/TJSONProtocol.cpp \
src/thrift/protocol/TBase64Utils.cpp \
+ src/thrift/protocol/TMultiplexedProtocol.cpp \
src/thrift/transport/TTransportException.cpp \
src/thrift/transport/TFDTransport.cpp \
src/thrift/transport/TFileTransport.cpp \
@@ -155,6 +156,8 @@
src/thrift/protocol/TDebugProtocol.h \
src/thrift/protocol/TBase64Utils.h \
src/thrift/protocol/TJSONProtocol.h \
+ src/thrift/protocol/TMultiplexedProtocol.h \
+ src/thrift/protocol/TProtocolDecorator.h \
src/thrift/protocol/TProtocolTap.h \
src/thrift/protocol/TProtocolException.h \
src/thrift/protocol/TVirtualProtocol.h \
@@ -195,7 +198,8 @@
include_processordir = $(include_thriftdir)/processor
include_processor_HEADERS = \
src/thrift/processor/PeekProcessor.h \
- src/thrift/processor/StatsProcessor.h
+ src/thrift/processor/StatsProcessor.h \
+ src/thrift/processor/TMultiplexedProcessor.h
include_asyncdir = $(include_thriftdir)/async
include_async_HEADERS = \
diff --git a/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h b/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h
new file mode 100644
index 0000000..494ec10
--- /dev/null
+++ b/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef THRIFT_TMULTIPLEXEDPROCESSOR_H_
+#define THRIFT_TMULTIPLEXEDPROCESSOR_H_ 1
+
+#include <thrift/protocol/TProtocolDecorator.h>
+#include <thrift/TApplicationException.h>
+#include <thrift/TProcessor.h>
+#include <boost/tokenizer.hpp>
+
+namespace apache
+{
+ namespace thrift
+ {
+ using boost::shared_ptr;
+
+ namespace protocol {
+
+ /**
+ * To be able to work with any protocol, we needed
+ * to allow them to call readMessageBegin() and get a TMessage in exactly
+ * the standard format, without the service name prepended to TMessage.name.
+ */
+ class StoredMessageProtocol : public TProtocolDecorator
+ {
+ public:
+ StoredMessageProtocol( shared_ptr<protocol::TProtocol> _protocol,
+ const std::string& _name, const TMessageType _type,
+ const int32_t _seqid) :
+ TProtocolDecorator(_protocol),
+ name(_name),
+ type(_type),
+ seqid(_seqid)
+ {
+ }
+
+ uint32_t readMessageBegin_virt(std::string& _name, TMessageType& _type, int32_t& _seqid)
+ {
+
+ _name = name;
+ _type = type;
+ _seqid = seqid;
+
+ return 0; // (Normal TProtocol read functions return number of bytes read)
+ }
+
+ std::string name;
+ TMessageType type;
+ int32_t seqid;
+ };
+ } //namespace protocol
+
+ /**
+ * <code>TMultiplexedProcessor</code> is a <code>TProcessor</code> allowing
+ * a single <code>TServer</code> to provide multiple services.
+ *
+ * <p>To do so, you instantiate the processor and then register additional
+ * processors with it, as shown in the following example:</p>
+ *
+ * <blockquote><code>
+ * shared_ptr<TMultiplexedProcessor> processor(new TMultiplexedProcessor());
+ *
+ * processor->registerProcessor(
+ * "Calculator",
+ * shared_ptr<TProcessor>( new CalculatorProcessor(
+ * shared_ptr<CalculatorHandler>( new CalculatorHandler()))));
+ *
+ * processor->registerProcessor(
+ * "WeatherReport",
+ * shared_ptr<TProcessor>( new WeatherReportProcessor(
+ * shared_ptr<WeatherReportHandler>( new WeatherReportHandler()))));
+ *
+ * shared_ptr<TServerTransport> transport(new TServerSocket(9090));
+ * TSimpleServer server(processor, transport);
+ *
+ * server.serve();
+ * </code></blockquote>
+ */
+ class TMultiplexedProcessor : public TProcessor
+ {
+ public:
+ typedef std::map< std::string, shared_ptr<TProcessor> > services_t;
+
+ /**
+ * 'Register' a service with this <code>TMultiplexedProcessor</code>. This
+ * allows us to broker requests to individual services by using the service
+ * name to select them at request time.
+ *
+ * \param [in] serviceName Name of a service, has to be identical to the name
+ * declared in the Thrift IDL, e.g. "WeatherReport".
+ * \param [in] processor Implementation of a service, ususally referred to
+ * as "handlers", e.g. WeatherReportHandler,
+ * implementing WeatherReportIf interface.
+ */
+ void registerProcessor( const std::string & serviceName,
+ shared_ptr<TProcessor> processor )
+ {
+ services[serviceName] = processor;
+ }
+
+ /**
+ * This implementation of <code>process</code> performs the following steps:
+ *
+ * <ol>
+ * <li>Read the beginning of the message.</li>
+ * <li>Extract the service name from the message.</li>
+ * <li>Using the service name to locate the appropriate processor.</li>
+ * <li>Dispatch to the processor, with a decorated instance of TProtocol
+ * that allows readMessageBegin() to return the original TMessage.</li>
+ * </ol>
+ *
+ * \throws TException If the message type is not T_CALL or T_ONEWAY, if
+ * the service name was not found in the message, or if the service
+ * name was not found in the service map.
+ */
+ bool process( shared_ptr<protocol::TProtocol> in,
+ shared_ptr<protocol::TProtocol> out,
+ void *connectionContext)
+ {
+ std::string name;
+ protocol::TMessageType type;
+ int32_t seqid;
+
+ // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
+ // message header. This pulls the message "off the wire", which we'll
+ // deal with at the end of this method.
+ in->readMessageBegin(name, type, seqid);
+
+ if( type != protocol::T_CALL && type != protocol::T_ONEWAY ) {
+ // Unexpected message type.
+ in->skip(::apache::thrift::protocol::T_STRUCT);
+ in->readMessageEnd();
+ in->getTransport()->readEnd();
+ const std::string msg("TMultiplexedProcessor: Unexpected message type");
+ ::apache::thrift::TApplicationException x(
+ ::apache::thrift::TApplicationException::PROTOCOL_ERROR,
+ msg);
+ out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
+ x.write(out.get());
+ out->writeMessageEnd();
+ out->getTransport()->writeEnd();
+ out->getTransport()->flush();
+ throw TException(msg);
+ }
+
+ // Extract the service name
+
+ boost::tokenizer<boost::char_separator<char> > tok( name, boost::char_separator<char>(":") );
+
+ std::vector<std::string> tokens;
+ std::copy( tok.begin(), tok.end(), std::back_inserter(tokens) );
+
+ // A valid message should consist of two tokens: the service
+ // name and the name of the method to call.
+ if( tokens.size() == 2 )
+ {
+ // Search for a processor associated with this service name.
+ services_t::iterator it = services.find(tokens[0]);
+
+ if( it != services.end() )
+ {
+ shared_ptr<TProcessor> processor = it->second;
+ // Let the processor registered for this service name
+ // process the message.
+ return processor->process(
+ shared_ptr<protocol::TProtocol>(
+ new protocol::StoredMessageProtocol( in, tokens[1], type, seqid ) ),
+ out, connectionContext );
+ }
+ else
+ {
+ // Unknown service.
+ in->skip(::apache::thrift::protocol::T_STRUCT);
+ in->readMessageEnd();
+ in->getTransport()->readEnd();
+
+ std::string msg("TMultiplexedProcessor: Unknown service: ");
+ msg += tokens[0];
+ ::apache::thrift::TApplicationException x(
+ ::apache::thrift::TApplicationException::PROTOCOL_ERROR,
+ msg);
+ out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
+ x.write(out.get());
+ out->writeMessageEnd();
+ out->getTransport()->writeEnd();
+ out->getTransport()->flush();
+ msg += ". Did you forget to call registerProcessor()?";
+ throw TException(msg);
+ }
+ }
+ return false;
+ }
+
+ private:
+ /** Map of service processor objects, indexed by service names. */
+ services_t services;
+ };
+ }
+}
+
+#endif // THRIFT_TMULTIPLEXEDPROCESSOR_H_
diff --git a/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.cpp b/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.cpp
new file mode 100644
index 0000000..a17eacc
--- /dev/null
+++ b/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.cpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/protocol/TMultiplexedProtocol.h>
+#include <thrift/processor/TMultiplexedProcessor.h>
+#include <thrift/protocol/TProtocolDecorator.h>
+
+namespace apache
+{
+ namespace thrift
+ {
+ namespace protocol
+ {
+ uint32_t TMultiplexedProtocol::writeMessageBegin_virt(
+ const std::string& _name,
+ const TMessageType _type,
+ const int32_t _seqid)
+ {
+ if( _type == T_CALL || _type == T_ONEWAY )
+ {
+ return TProtocolDecorator::writeMessageBegin_virt( serviceName + separator + _name, _type, _seqid );
+ }
+ else
+ {
+ return TProtocolDecorator::writeMessageBegin_virt(_name, _type, _seqid);
+ }
+ }
+ }
+ }
+}
+
diff --git a/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.h b/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.h
new file mode 100644
index 0000000..a59c7b4
--- /dev/null
+++ b/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.h
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef THRIFT_TMULTIPLEXEDPROTOCOL_H_
+#define THRIFT_TMULTIPLEXEDPROTOCOL_H_ 1
+
+#include <thrift/protocol/TProtocolDecorator.h>
+
+namespace apache
+{
+ namespace thrift
+ {
+ namespace protocol
+ {
+ using boost::shared_ptr;
+
+ /**
+ * <code>TMultiplexedProtocol</code> is a protocol-independent concrete decorator
+ * that allows a Thrift client to communicate with a multiplexing Thrift server,
+ * by prepending the service name to the function name during function calls.
+ *
+ * \note THIS IS NOT USED BY SERVERS. On the server, use
+ * {@link apache::thrift::TMultiplexedProcessor TMultiplexedProcessor} to handle requests
+ * from a multiplexing client.
+ *
+ * This example uses a single socket transport to invoke two services:
+ *
+ * <blockquote><code>
+ * shared_ptr<TSocket> transport(new TSocket("localhost", 9090));
+ * transport->open();
+ *
+ * shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
+ *
+ * shared_ptr<TMultiplexedProtocol> mp1(new TMultiplexedProtocol(protocol, "Calculator"));
+ * shared_ptr<CalculatorClient> service1(new CalculatorClient(mp1));
+ *
+ * shared_ptr<TMultiplexedProtocol> mp2(new TMultiplexedProtocol(protocol, "WeatherReport"));
+ * shared_ptr<WeatherReportClient> service2(new WeatherReportClient(mp2));
+ *
+ * service1->add(2,2);
+ * int temp = service2->getTemperature();
+ * </code></blockquote>
+ *
+ * @see apache::thrift::protocol::TProtocolDecorator
+ */
+ class TMultiplexedProtocol : public TProtocolDecorator
+ {
+ public:
+ /**
+ * Wrap the specified protocol, allowing it to be used to communicate with a
+ * multiplexing server. The <code>serviceName</code> is required as it is
+ * prepended to the message header so that the multiplexing server can broker
+ * the function call to the proper service.
+ *
+ * \param _protocol Your communication protocol of choice, e.g. <code>TBinaryProtocol</code>.
+ * \param _serviceName The service name of the service communicating via this protocol.
+ */
+ TMultiplexedProtocol( shared_ptr<TProtocol> _protocol, const std::string& _serviceName )
+ : TProtocolDecorator(_protocol),
+ serviceName(_serviceName),
+ separator(":")
+ { }
+ virtual ~TMultiplexedProtocol() {}
+
+ /**
+ * Prepends the service name to the function name, separated by TMultiplexedProtocol::SEPARATOR.
+ *
+ * \param [in] _name The name of the method to be called in the service.
+ * \param [in] _type The type of message
+ * \param [in] _name The sequential id of the message
+ *
+ * \throws TException Passed through from wrapped <code>TProtocol</code> instance.
+ */
+ uint32_t writeMessageBegin_virt(
+ const std::string& _name,
+ const TMessageType _type,
+ const int32_t _seqid);
+ private:
+ const std::string serviceName;
+ const std::string separator;
+ };
+
+ }
+ }
+}
+
+#endif // THRIFT_TMULTIPLEXEDPROTOCOL_H_
diff --git a/lib/cpp/src/thrift/protocol/TProtocolDecorator.h b/lib/cpp/src/thrift/protocol/TProtocolDecorator.h
new file mode 100644
index 0000000..7850bc5
--- /dev/null
+++ b/lib/cpp/src/thrift/protocol/TProtocolDecorator.h
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef THRIFT_TPROTOCOLDECORATOR_H_
+#define THRIFT_TPROTOCOLDECORATOR_H_ 1
+
+#include <thrift/protocol/TProtocol.h>
+#include <boost/shared_ptr.hpp>
+
+namespace apache
+{
+ namespace thrift
+ {
+ namespace protocol
+ {
+ using boost::shared_ptr;
+
+ /**
+ * <code>TProtocolDecorator</code> forwards all requests to an enclosed
+ * <code>TProtocol</code> instance, providing a way to author concise
+ * concrete decorator subclasses.
+ *
+ * <p>See p.175 of Design Patterns (by Gamma et al.)</p>
+ *
+ * @see apache::thrift::protocol::TMultiplexedProtocol
+ */
+ class TProtocolDecorator : public TProtocol
+ {
+ public:
+ virtual ~TProtocolDecorator() {}
+
+ // Desc: Initializes the protocol decorator object.
+ TProtocolDecorator( shared_ptr<TProtocol> proto )
+ : TProtocol(proto->getTransport()), protocol(proto)
+ {
+ }
+
+ virtual uint32_t writeMessageBegin_virt(
+ const std::string& name,
+ const TMessageType messageType,
+ const int32_t seqid)
+ {
+ return protocol->writeMessageBegin(name, messageType, seqid);
+ }
+ virtual uint32_t writeMessageEnd_virt() { return protocol->writeMessageEnd(); }
+ virtual uint32_t writeStructBegin_virt(const char* name) { return protocol->writeStructBegin(name); }
+ virtual uint32_t writeStructEnd_virt() { return protocol->writeStructEnd(); }
+
+ virtual uint32_t writeFieldBegin_virt(const char* name,
+ const TType fieldType,
+ const int16_t fieldId) { return protocol->writeFieldBegin(name,fieldType,fieldId); }
+
+ virtual uint32_t writeFieldEnd_virt() { return protocol->writeFieldEnd(); }
+ virtual uint32_t writeFieldStop_virt() { return protocol->writeFieldStop(); }
+
+ virtual uint32_t writeMapBegin_virt(const TType keyType,
+ const TType valType,
+ const uint32_t size) { return protocol->writeMapBegin(keyType,valType,size); }
+
+ virtual uint32_t writeMapEnd_virt() { return protocol->writeMapEnd(); }
+
+ virtual uint32_t writeListBegin_virt(const TType elemType, const uint32_t size) { return protocol->writeListBegin(elemType,size); }
+ virtual uint32_t writeListEnd_virt() { return protocol->writeListEnd(); }
+
+ virtual uint32_t writeSetBegin_virt(const TType elemType, const uint32_t size) { return protocol->writeSetBegin(elemType,size); }
+ virtual uint32_t writeSetEnd_virt() { return protocol->writeSetEnd(); }
+
+ virtual uint32_t writeBool_virt(const bool value) { return protocol->writeBool(value); }
+ virtual uint32_t writeByte_virt(const int8_t byte) { return protocol->writeByte(byte); }
+ virtual uint32_t writeI16_virt(const int16_t i16) { return protocol->writeI16(i16); }
+ virtual uint32_t writeI32_virt(const int32_t i32) { return protocol->writeI32(i32); }
+ virtual uint32_t writeI64_virt(const int64_t i64) { return protocol->writeI64(i64); }
+
+ virtual uint32_t writeDouble_virt(const double dub) { return protocol->writeDouble(dub); }
+ virtual uint32_t writeString_virt(const std::string& str) { return protocol->writeString(str); }
+ virtual uint32_t writeBinary_virt(const std::string& str) { return protocol->writeBinary(str); }
+
+ virtual uint32_t readMessageBegin_virt(std::string& name, TMessageType& messageType, int32_t& seqid) { return protocol->readMessageBegin(name,messageType,seqid); }
+ virtual uint32_t readMessageEnd_virt() { return protocol->readMessageEnd(); }
+
+ virtual uint32_t readStructBegin_virt(std::string& name) { return protocol->readStructBegin(name); }
+ virtual uint32_t readStructEnd_virt() { return protocol->readStructEnd(); }
+
+ virtual uint32_t readFieldBegin_virt(std::string& name, TType& fieldType, int16_t& fieldId) { return protocol->readFieldBegin(name, fieldType, fieldId); }
+ virtual uint32_t readFieldEnd_virt() { return protocol->readFieldEnd(); }
+
+ virtual uint32_t readMapBegin_virt(TType& keyType, TType& valType, uint32_t& size) { return protocol->readMapBegin(keyType,valType,size); }
+ virtual uint32_t readMapEnd_virt() { return protocol->readMapEnd(); }
+
+ virtual uint32_t readListBegin_virt(TType& elemType, uint32_t& size) { return protocol->readListBegin(elemType,size); }
+ virtual uint32_t readListEnd_virt() { return protocol->readListEnd(); }
+
+ virtual uint32_t readSetBegin_virt(TType& elemType, uint32_t& size) { return protocol->readSetBegin(elemType,size); }
+ virtual uint32_t readSetEnd_virt() { return protocol->readSetEnd(); }
+
+ virtual uint32_t readBool_virt(bool& value) { return protocol->readBool(value); }
+ virtual uint32_t readBool_virt(std::vector<bool>::reference value) { return protocol->readBool(value); }
+
+ virtual uint32_t readByte_virt(int8_t& byte) { return protocol->readByte(byte); }
+
+ virtual uint32_t readI16_virt(int16_t& i16) { return protocol->readI16(i16); }
+ virtual uint32_t readI32_virt(int32_t& i32) { return protocol->readI32(i32); }
+ virtual uint32_t readI64_virt(int64_t& i64) { return protocol->readI64(i64); }
+
+ virtual uint32_t readDouble_virt(double& dub) { return protocol->readDouble(dub); }
+
+ virtual uint32_t readString_virt(std::string& str) { return protocol->readString(str); }
+ virtual uint32_t readBinary_virt(std::string& str) { return protocol->readBinary(str); }
+
+ private:
+ shared_ptr<TProtocol> protocol;
+ };
+ }
+ }
+}
+
+#endif // THRIFT_TPROTOCOLDECORATOR_H_