THRIFT-1902 C++: Support for Multiplexing Services on any Transport, Protocol and Server
Patch: Patrik Lindblom
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index f40cdb1..05caf8c 100755
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -63,6 +63,7 @@
                        src/thrift/protocol/TDenseProtocol.cpp \
                        src/thrift/protocol/TJSONProtocol.cpp \
                        src/thrift/protocol/TBase64Utils.cpp \
+                       src/thrift/protocol/TMultiplexedProtocol.cpp \
                        src/thrift/transport/TTransportException.cpp \
                        src/thrift/transport/TFDTransport.cpp \
                        src/thrift/transport/TFileTransport.cpp \
@@ -155,6 +156,8 @@
                          src/thrift/protocol/TDebugProtocol.h \
                          src/thrift/protocol/TBase64Utils.h \
                          src/thrift/protocol/TJSONProtocol.h \
+                         src/thrift/protocol/TMultiplexedProtocol.h \
+                         src/thrift/protocol/TProtocolDecorator.h \
                          src/thrift/protocol/TProtocolTap.h \
                          src/thrift/protocol/TProtocolException.h \
                          src/thrift/protocol/TVirtualProtocol.h \
@@ -195,7 +198,8 @@
 include_processordir = $(include_thriftdir)/processor
 include_processor_HEADERS = \
                          src/thrift/processor/PeekProcessor.h \
-                         src/thrift/processor/StatsProcessor.h
+                         src/thrift/processor/StatsProcessor.h \
+                         src/thrift/processor/TMultiplexedProcessor.h
 
 include_asyncdir = $(include_thriftdir)/async
 include_async_HEADERS = \
diff --git a/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h b/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h
new file mode 100644
index 0000000..494ec10
--- /dev/null
+++ b/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef THRIFT_TMULTIPLEXEDPROCESSOR_H_
+#define THRIFT_TMULTIPLEXEDPROCESSOR_H_ 1
+
+#include <thrift/protocol/TProtocolDecorator.h>
+#include <thrift/TApplicationException.h>
+#include <thrift/TProcessor.h>
+#include <boost/tokenizer.hpp>
+
+namespace apache 
+{ 
+    namespace thrift 
+    { 
+        using boost::shared_ptr;
+
+        namespace protocol {
+
+            /**
+             *  To be able to work with any protocol, we needed
+             *  to allow them to call readMessageBegin() and get a TMessage in exactly
+             *  the standard format, without the service name prepended to TMessage.name.
+             */
+            class StoredMessageProtocol : public TProtocolDecorator
+            {
+            public:
+                StoredMessageProtocol( shared_ptr<protocol::TProtocol> _protocol,
+                    const std::string& _name, const TMessageType _type, 
+                    const int32_t _seqid) :
+                    TProtocolDecorator(_protocol),
+                    name(_name),
+                    type(_type),
+                    seqid(_seqid)
+                {
+                }
+
+                uint32_t readMessageBegin_virt(std::string& _name, TMessageType& _type, int32_t& _seqid)
+                {
+                    
+                    _name  = name;
+                    _type  = type;
+                    _seqid = seqid;
+
+                    return 0; // (Normal TProtocol read functions return number of bytes read)
+                }
+
+                std::string name;
+                TMessageType type;
+                int32_t seqid;
+            };
+        } //namespace protocol
+
+        /**
+         * <code>TMultiplexedProcessor</code> is a <code>TProcessor</code> allowing
+         * a single <code>TServer</code> to provide multiple services.
+         *
+         * <p>To do so, you instantiate the processor and then register additional
+         * processors with it, as shown in the following example:</p>
+         *
+         * <blockquote><code>
+         *     shared_ptr<TMultiplexedProcessor> processor(new TMultiplexedProcessor());
+         *
+         *     processor->registerProcessor(
+         *         "Calculator",
+         *         shared_ptr<TProcessor>( new CalculatorProcessor(
+         *             shared_ptr<CalculatorHandler>( new CalculatorHandler()))));
+         *
+         *     processor->registerProcessor(
+         *         "WeatherReport",
+         *         shared_ptr<TProcessor>( new WeatherReportProcessor(
+         *             shared_ptr<WeatherReportHandler>( new WeatherReportHandler()))));
+         *
+         *     shared_ptr<TServerTransport> transport(new TServerSocket(9090));
+         *     TSimpleServer server(processor, transport);
+         *
+         *     server.serve();
+         * </code></blockquote>
+         */
+        class TMultiplexedProcessor : public TProcessor
+        {
+        public:
+            typedef std::map< std::string, shared_ptr<TProcessor> > services_t;
+
+           /**
+             * 'Register' a service with this <code>TMultiplexedProcessor</code>.  This
+             * allows us to broker requests to individual services by using the service
+             * name to select them at request time.
+             *
+             * \param [in] serviceName Name of a service, has to be identical to the name
+             *                         declared in the Thrift IDL, e.g. "WeatherReport".
+             * \param [in] processor   Implementation of a service, ususally referred to
+             *                         as "handlers", e.g. WeatherReportHandler,
+             *                         implementing WeatherReportIf interface.
+             */
+            void registerProcessor( const std::string & serviceName, 
+                                    shared_ptr<TProcessor> processor )
+            {
+                services[serviceName] = processor;
+            }
+
+            /**
+             * This implementation of <code>process</code> performs the following steps:
+             *
+             * <ol>
+             *     <li>Read the beginning of the message.</li>
+             *     <li>Extract the service name from the message.</li>
+             *     <li>Using the service name to locate the appropriate processor.</li>
+             *     <li>Dispatch to the processor, with a decorated instance of TProtocol
+             *         that allows readMessageBegin() to return the original TMessage.</li>
+             * </ol>
+             *  
+             * \throws TException If the message type is not T_CALL or T_ONEWAY, if
+             * the service name was not found in the message, or if the service
+             * name was not found in the service map. 
+             */
+            bool process( shared_ptr<protocol::TProtocol> in, 
+                          shared_ptr<protocol::TProtocol> out,
+                          void *connectionContext)
+            {
+                std::string name;
+                protocol::TMessageType type;
+                int32_t seqid;
+
+                // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
+                // message header.  This pulls the message "off the wire", which we'll
+                // deal with at the end of this method.
+                in->readMessageBegin(name, type, seqid);
+
+                if( type != protocol::T_CALL && type != protocol::T_ONEWAY ) {
+                    // Unexpected message type. 
+                    in->skip(::apache::thrift::protocol::T_STRUCT);
+                    in->readMessageEnd();
+                    in->getTransport()->readEnd();
+                    const std::string msg("TMultiplexedProcessor: Unexpected message type");
+                    ::apache::thrift::TApplicationException x(
+                        ::apache::thrift::TApplicationException::PROTOCOL_ERROR, 
+                        msg);
+                    out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
+                    x.write(out.get());
+                    out->writeMessageEnd();
+                    out->getTransport()->writeEnd();
+                    out->getTransport()->flush();
+                    throw TException(msg);
+                }
+
+                // Extract the service name
+                
+                boost::tokenizer<boost::char_separator<char> > tok( name, boost::char_separator<char>(":") );
+
+                std::vector<std::string> tokens;
+                std::copy( tok.begin(), tok.end(), std::back_inserter(tokens) );
+
+                // A valid message should consist of two tokens: the service
+                // name and the name of the method to call.
+                if( tokens.size() == 2 )
+                {
+                    // Search for a processor associated with this service name.
+                    services_t::iterator it = services.find(tokens[0]);
+
+                    if( it != services.end() )
+                    {
+                        shared_ptr<TProcessor> processor = it->second;
+                        // Let the processor registered for this service name 
+                        // process the message.
+                        return processor->process( 
+                            shared_ptr<protocol::TProtocol>( 
+                                new protocol::StoredMessageProtocol( in, tokens[1], type, seqid ) ), 
+                            out, connectionContext );
+                    }
+                    else
+                    {
+                        // Unknown service. 
+                        in->skip(::apache::thrift::protocol::T_STRUCT);
+                        in->readMessageEnd();
+                        in->getTransport()->readEnd();
+                        
+                        std::string msg("TMultiplexedProcessor: Unknown service: ");
+                        msg += tokens[0];
+                        ::apache::thrift::TApplicationException x(
+                            ::apache::thrift::TApplicationException::PROTOCOL_ERROR, 
+                            msg);
+                        out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
+                        x.write(out.get());
+                        out->writeMessageEnd();
+                        out->getTransport()->writeEnd();
+                        out->getTransport()->flush();
+                        msg += ". Did you forget to call registerProcessor()?";
+                        throw TException(msg);
+                    }
+                }
+                return false;
+            }
+
+        private:
+            /** Map of service processor objects, indexed by service names. */
+            services_t services;
+        };
+    }
+} 
+
+#endif // THRIFT_TMULTIPLEXEDPROCESSOR_H_
diff --git a/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.cpp b/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.cpp
new file mode 100644
index 0000000..a17eacc
--- /dev/null
+++ b/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.cpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/protocol/TMultiplexedProtocol.h>
+#include <thrift/processor/TMultiplexedProcessor.h>
+#include <thrift/protocol/TProtocolDecorator.h>
+
+namespace apache 
+{ 
+    namespace thrift 
+    { 
+        namespace protocol 
+        {
+            uint32_t TMultiplexedProtocol::writeMessageBegin_virt(
+                const std::string& _name,
+                const TMessageType _type,
+                const int32_t _seqid)
+            {
+                if( _type == T_CALL || _type == T_ONEWAY )
+                {
+                    return TProtocolDecorator::writeMessageBegin_virt( serviceName + separator + _name, _type, _seqid );
+                }
+                else
+                {
+                    return TProtocolDecorator::writeMessageBegin_virt(_name, _type, _seqid);
+                }
+            }
+        }
+    }
+}
+
diff --git a/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.h b/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.h
new file mode 100644
index 0000000..a59c7b4
--- /dev/null
+++ b/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.h
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef THRIFT_TMULTIPLEXEDPROTOCOL_H_
+#define THRIFT_TMULTIPLEXEDPROTOCOL_H_ 1
+
+#include <thrift/protocol/TProtocolDecorator.h>
+
+namespace apache 
+{ 
+    namespace thrift 
+    { 
+        namespace protocol 
+        {
+            using boost::shared_ptr;
+
+            /**
+             * <code>TMultiplexedProtocol</code> is a protocol-independent concrete decorator
+             * that allows a Thrift client to communicate with a multiplexing Thrift server,
+             * by prepending the service name to the function name during function calls.
+             *
+             * \note THIS IS NOT USED BY SERVERS.  On the server, use 
+             * {@link apache::thrift::TMultiplexedProcessor TMultiplexedProcessor} to handle requests
+             * from a multiplexing client.
+             *
+             * This example uses a single socket transport to invoke two services:
+             *
+             * <blockquote><code>
+             *     shared_ptr<TSocket> transport(new TSocket("localhost", 9090));
+             *     transport->open();
+             *
+             *     shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
+             *
+             *     shared_ptr<TMultiplexedProtocol> mp1(new TMultiplexedProtocol(protocol, "Calculator"));
+             *     shared_ptr<CalculatorClient> service1(new CalculatorClient(mp1));
+             *
+             *     shared_ptr<TMultiplexedProtocol> mp2(new TMultiplexedProtocol(protocol, "WeatherReport"));
+             *     shared_ptr<WeatherReportClient> service2(new WeatherReportClient(mp2));
+             *
+             *     service1->add(2,2);
+             *     int temp = service2->getTemperature();
+             * </code></blockquote>
+             *
+             * @see apache::thrift::protocol::TProtocolDecorator
+             */
+             class TMultiplexedProtocol : public TProtocolDecorator
+            {
+            public:
+                /**
+                 * Wrap the specified protocol, allowing it to be used to communicate with a
+                 * multiplexing server.  The <code>serviceName</code> is required as it is
+                 * prepended to the message header so that the multiplexing server can broker
+                 * the function call to the proper service.
+                 *
+                 * \param _protocol    Your communication protocol of choice, e.g. <code>TBinaryProtocol</code>.
+                 * \param _serviceName The service name of the service communicating via this protocol.
+                 */
+                 TMultiplexedProtocol( shared_ptr<TProtocol> _protocol, const std::string& _serviceName ) 
+                    : TProtocolDecorator(_protocol),
+                      serviceName(_serviceName),
+                      separator(":")
+                { }
+                virtual ~TMultiplexedProtocol() {}
+
+                /**
+                 * Prepends the service name to the function name, separated by TMultiplexedProtocol::SEPARATOR.
+                 *
+                 * \param [in] _name   The name of the method to be called in the service.
+                 * \param [in] _type   The type of message
+                 * \param [in] _name   The sequential id of the message
+                 *
+                 * \throws TException  Passed through from wrapped <code>TProtocol</code> instance.
+                 */
+                uint32_t writeMessageBegin_virt(
+                    const std::string& _name, 
+                    const TMessageType _type, 
+                    const int32_t _seqid);
+            private:
+                const std::string serviceName;
+                const std::string separator;
+            };
+
+        }
+    }
+}
+
+#endif // THRIFT_TMULTIPLEXEDPROTOCOL_H_
diff --git a/lib/cpp/src/thrift/protocol/TProtocolDecorator.h b/lib/cpp/src/thrift/protocol/TProtocolDecorator.h
new file mode 100644
index 0000000..7850bc5
--- /dev/null
+++ b/lib/cpp/src/thrift/protocol/TProtocolDecorator.h
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef THRIFT_TPROTOCOLDECORATOR_H_
+#define THRIFT_TPROTOCOLDECORATOR_H_ 1
+
+#include <thrift/protocol/TProtocol.h>
+#include <boost/shared_ptr.hpp>
+
+namespace apache 
+{ 
+    namespace thrift 
+    { 
+        namespace protocol 
+        {
+            using boost::shared_ptr;
+
+            /**
+             * <code>TProtocolDecorator</code> forwards all requests to an enclosed
+             * <code>TProtocol</code> instance, providing a way to author concise
+             * concrete decorator subclasses.  
+             *
+             * <p>See p.175 of Design Patterns (by Gamma et al.)</p>
+             * 
+             * @see apache::thrift::protocol::TMultiplexedProtocol
+             */
+            class TProtocolDecorator : public TProtocol
+            {
+            public:
+                virtual ~TProtocolDecorator() {}
+                
+                // Desc: Initializes the protocol decorator object.
+                TProtocolDecorator( shared_ptr<TProtocol> proto ) 
+                    : TProtocol(proto->getTransport()), protocol(proto)
+                {
+                }
+
+                virtual uint32_t writeMessageBegin_virt(
+                    const std::string& name, 
+                    const TMessageType messageType, 
+                    const int32_t seqid)
+                {
+                    return protocol->writeMessageBegin(name, messageType, seqid); 
+                }
+                virtual uint32_t writeMessageEnd_virt() { return protocol->writeMessageEnd(); }
+                virtual uint32_t writeStructBegin_virt(const char* name) { return protocol->writeStructBegin(name); }
+                virtual uint32_t writeStructEnd_virt() { return protocol->writeStructEnd(); }
+
+                virtual uint32_t writeFieldBegin_virt(const char* name,
+                    const TType fieldType,
+                    const int16_t fieldId) { return protocol->writeFieldBegin(name,fieldType,fieldId); }
+
+                virtual uint32_t writeFieldEnd_virt()  { return protocol->writeFieldEnd(); }
+                virtual uint32_t writeFieldStop_virt() { return protocol->writeFieldStop(); }
+
+                virtual uint32_t writeMapBegin_virt(const TType keyType,
+                    const TType valType,
+                    const uint32_t size) { return protocol->writeMapBegin(keyType,valType,size); }
+
+                virtual uint32_t writeMapEnd_virt() { return protocol->writeMapEnd(); }
+
+                virtual uint32_t writeListBegin_virt(const TType elemType, const uint32_t size) { return protocol->writeListBegin(elemType,size); }
+                virtual uint32_t writeListEnd_virt() { return protocol->writeListEnd(); }
+
+                virtual uint32_t writeSetBegin_virt(const TType elemType, const uint32_t size) { return protocol->writeSetBegin(elemType,size); }
+                virtual uint32_t writeSetEnd_virt() { return protocol->writeSetEnd(); }
+
+                virtual uint32_t writeBool_virt(const bool value)  { return protocol->writeBool(value); }
+                virtual uint32_t writeByte_virt(const int8_t byte) { return protocol->writeByte(byte); }
+                virtual uint32_t writeI16_virt(const int16_t i16)  { return protocol->writeI16(i16); }
+                virtual uint32_t writeI32_virt(const int32_t i32)  { return protocol->writeI32(i32); }
+                virtual uint32_t writeI64_virt(const int64_t i64)  { return protocol->writeI64(i64); }
+
+                virtual uint32_t writeDouble_virt(const double dub) { return protocol->writeDouble(dub); }
+                virtual uint32_t writeString_virt(const std::string& str) { return protocol->writeString(str); }
+                virtual uint32_t writeBinary_virt(const std::string& str) { return protocol->writeBinary(str); }
+
+                virtual uint32_t readMessageBegin_virt(std::string& name, TMessageType& messageType, int32_t& seqid) { return protocol->readMessageBegin(name,messageType,seqid); }
+                virtual uint32_t readMessageEnd_virt() { return protocol->readMessageEnd(); }
+
+                virtual uint32_t readStructBegin_virt(std::string& name) { return protocol->readStructBegin(name); }
+                virtual uint32_t readStructEnd_virt() { return protocol->readStructEnd(); }
+
+                virtual uint32_t readFieldBegin_virt(std::string& name, TType& fieldType, int16_t& fieldId) { return protocol->readFieldBegin(name, fieldType, fieldId); }
+                virtual uint32_t readFieldEnd_virt() { return protocol->readFieldEnd(); }
+
+                virtual uint32_t readMapBegin_virt(TType& keyType, TType& valType, uint32_t& size) { return protocol->readMapBegin(keyType,valType,size); }
+                virtual uint32_t readMapEnd_virt() { return protocol->readMapEnd(); }
+
+                virtual uint32_t readListBegin_virt(TType& elemType, uint32_t& size) { return protocol->readListBegin(elemType,size); }
+                virtual uint32_t readListEnd_virt() { return protocol->readListEnd(); }
+
+                virtual uint32_t readSetBegin_virt(TType& elemType, uint32_t& size) { return protocol->readSetBegin(elemType,size); }
+                virtual uint32_t readSetEnd_virt() { return protocol->readSetEnd(); }
+
+                virtual uint32_t readBool_virt(bool& value) { return protocol->readBool(value); }
+                virtual uint32_t readBool_virt(std::vector<bool>::reference value) { return protocol->readBool(value); }
+                
+                virtual uint32_t readByte_virt(int8_t& byte) { return protocol->readByte(byte); }
+
+                virtual uint32_t readI16_virt(int16_t& i16) { return protocol->readI16(i16); }
+                virtual uint32_t readI32_virt(int32_t& i32) { return protocol->readI32(i32); }
+                virtual uint32_t readI64_virt(int64_t& i64) { return protocol->readI64(i64); }
+
+                virtual uint32_t readDouble_virt(double& dub) { return protocol->readDouble(dub); }
+
+                virtual uint32_t readString_virt(std::string& str) { return protocol->readString(str); }
+                virtual uint32_t readBinary_virt(std::string& str) { return protocol->readBinary(str); }
+
+            private:
+                shared_ptr<TProtocol> protocol;    
+            };
+        }
+    }
+} 
+
+#endif // THRIFT_TPROTOCOLDECORATOR_H_