THRIFT-2110 Erlang: Support for Multiplexing Services on any Transport, Protocol and Server
Client: Erlang
Patch: David Robakowski rebased by Nobuaki Sukegawa

Modification: Return value fix in thrift_client uncovered by added tests
diff --git a/lib/erl/src/thrift_client.erl b/lib/erl/src/thrift_client.erl
index 209bd4c..7bf50a5 100644
--- a/lib/erl/src/thrift_client.erl
+++ b/lib/erl/src/thrift_client.erl
@@ -39,6 +39,7 @@
 when is_atom(Function), is_list(Args) ->
   case send_function_call(Client, Function, Args) of
     {ok, Client1} -> receive_function_result(Client1, Function);
+    {{error, X}, Client1} -> {Client1, {error, X}};
     Else -> Else
   end.
 
@@ -66,7 +67,7 @@
 send_function_call(Client = #tclient{service = Service}, Function, Args) ->
   {Params, Reply} = try
     {Service:function_info(Function, params_type), Service:function_info(Function, reply_type)}
-  catch error:function_clause -> no_function
+  catch error:function_clause -> {no_function, 0}
   end,
   MsgType = case Reply of
     oneway_void -> ?tMessageType_ONEWAY;
diff --git a/lib/erl/src/thrift_client_util.erl b/lib/erl/src/thrift_client_util.erl
index 265c308..1dbe51e 100644
--- a/lib/erl/src/thrift_client_util.erl
+++ b/lib/erl/src/thrift_client_util.erl
@@ -20,6 +20,11 @@
 -module(thrift_client_util).
 
 -export([new/4]).
+-export([new_multiplexed/3, new_multiplexed/4]).
+
+-type service_name()            :: nonempty_string().
+-type service_module()          :: atom().
+-type multiplexed_service_map() :: [{ServiceName::service_name(), ServiceModule::service_module()}].
 
 %%
 %% Splits client options into client, protocol, and transport options
@@ -76,3 +81,32 @@
         {error, Error} ->
             {error, Error}
     end.
+
+-spec new_multiplexed(Host, Port, Services, Options) -> {ok, ServiceThriftClientList} when
+    Host        :: nonempty_string(),
+    Port        :: non_neg_integer(),
+    Services    :: multiplexed_service_map(),
+    Options     :: list(),
+    ServiceThriftClientList :: [{ServiceName::list(), ThriftClient::term()}].
+new_multiplexed(Host, Port, Services, Options) when is_integer(Port),
+                                                    is_list(Services),
+                                                    is_list(Options) ->
+    new_multiplexed(thrift_socket_transport:new_transport_factory(Host, Port, Options), Services, Options).
+
+-spec new_multiplexed(TransportFactoryTuple, Services, Options) -> {ok, ServiceThriftClientList} when
+    TransportFactoryTuple   :: {ok, TransportFactory::term()},
+    Services                :: multiplexed_service_map(),
+    Options                 :: list(),
+    ServiceThriftClientList :: [{ServiceName::service_name(), ThriftClient::term()}].
+new_multiplexed(TransportFactoryTuple, Services, Options) when is_list(Services),
+                                                               is_list(Options),
+                                                               is_tuple(TransportFactoryTuple) ->
+    {ProtoOpts, _} = split_options(Options),
+
+    {ok, TransportFactory} = TransportFactoryTuple,
+
+    {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory(TransportFactory, ProtoOpts),
+
+    {ok, Protocol} = ProtocolFactory(),
+
+    {ok, [{ServiceName, element(2, thrift_client:new(element(2, thrift_multiplexed_protocol:new(Protocol, ServiceName)), Service))} || {ServiceName, Service} <- Services]}.
diff --git a/lib/erl/src/thrift_multiplexed_map_wrapper.erl b/lib/erl/src/thrift_multiplexed_map_wrapper.erl
new file mode 100644
index 0000000..34c5e95
--- /dev/null
+++ b/lib/erl/src/thrift_multiplexed_map_wrapper.erl
@@ -0,0 +1,57 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%%   http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_multiplexed_map_wrapper).
+
+-export([
+          new/0
+         ,store/3
+         ,find/2
+         ,fetch/2
+        ]).
+
+-type service_handler()     :: nonempty_string().
+-type module_()             :: atom().
+-type service_handler_map() :: [{ServiceHandler::service_handler(), Module::module_()}].
+
+-spec new() -> service_handler_map().
+new() ->
+    orddict:new().
+
+-spec store(ServiceHandler, Module, Map) -> NewMap when
+    ServiceHandler :: service_handler(),
+    Module         :: module_(),
+    Map            :: service_handler_map(),
+    NewMap         :: service_handler_map().
+store(ServiceHandler, Module, Map) ->
+    orddict:store(ServiceHandler, Module, Map).
+
+-spec find(ServiceHandler, Map) -> {ok, Module} | error when
+    ServiceHandler :: service_handler(),
+    Module         :: module_(),
+    Map            :: service_handler_map().
+find(ServiceHandler, Map) ->
+    orddict:find(ServiceHandler, Map).
+
+-spec fetch(ServiceHandler, Map) -> Module when
+    ServiceHandler :: service_handler(),
+    Module         :: module_(),
+    Map            :: service_handler_map().
+fetch(ServiceHandler, Map) ->
+    orddict:fetch(ServiceHandler, Map).
diff --git a/lib/erl/src/thrift_multiplexed_protocol.erl b/lib/erl/src/thrift_multiplexed_protocol.erl
new file mode 100644
index 0000000..5f7b70c
--- /dev/null
+++ b/lib/erl/src/thrift_multiplexed_protocol.erl
@@ -0,0 +1,83 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%%   http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_multiplexed_protocol).
+
+-behaviour(thrift_protocol).
+
+-include("thrift_constants.hrl").
+-include("thrift_protocol.hrl").
+
+-include("thrift_protocol_behaviour.hrl").
+
+-export([new/2,
+         read/2,
+         write/2,
+         flush_transport/1,
+         close_transport/1
+        ]).
+
+-record(protocol, {module, data}).
+-type protocol() :: #protocol{}.
+
+-record (multiplexed_protocol, {protocol_module_to_decorate::atom(),
+								protocol_data_to_decorate::term(),
+								service_name::nonempty_string()}).
+
+-type state() :: #multiplexed_protocol{}.
+
+-spec new(ProtocolToDecorate::protocol(), ServiceName::nonempty_string()) -> {ok, Protocol::protocol()}.
+new(ProtocolToDecorate, ServiceName) when is_record(ProtocolToDecorate, protocol),
+                                          is_list(ServiceName) ->
+    State = #multiplexed_protocol{protocol_module_to_decorate = ProtocolToDecorate#protocol.module,
+                                    protocol_data_to_decorate = ProtocolToDecorate#protocol.data,
+                                                 service_name = ServiceName},
+    thrift_protocol:new(?MODULE, State).
+
+flush_transport(State = #multiplexed_protocol{protocol_module_to_decorate = ProtocolModuleToDecorate,
+                                                protocol_data_to_decorate = State0}) ->
+    {State1, ok} = ProtocolModuleToDecorate:flush_transport(State0),
+    {State#multiplexed_protocol{protocol_data_to_decorate = State1}, ok}.
+
+close_transport(State = #multiplexed_protocol{protocol_module_to_decorate = ProtocolModuleToDecorate,
+                                                protocol_data_to_decorate = State0}) ->
+    {State1, ok} = ProtocolModuleToDecorate:close_transport(State0),
+    {State#multiplexed_protocol{protocol_data_to_decorate = State1}, ok}.
+
+write(State = #multiplexed_protocol{protocol_module_to_decorate = ProtocolModuleToDecorate,
+                                      protocol_data_to_decorate = State0,
+                                                   service_name = ServiceName},
+      Message = #protocol_message_begin{name = Name}) ->
+    {State1, ok} = ProtocolModuleToDecorate:write(State0,
+                                                  Message#protocol_message_begin{name=ServiceName ++
+                                                                                      ?MULTIPLEXED_SERVICE_SEPARATOR ++
+                                                                                      Name}),
+    {State#multiplexed_protocol{protocol_data_to_decorate = State1}, ok};
+
+write(State = #multiplexed_protocol{protocol_module_to_decorate = ProtocolModuleToDecorate,
+                                      protocol_data_to_decorate = State0},
+      Message) ->
+    {State1, ok} = ProtocolModuleToDecorate:write(State0, Message),
+    {State#multiplexed_protocol{protocol_data_to_decorate = State1}, ok}.
+
+read(State = #multiplexed_protocol{protocol_module_to_decorate = ProtocolModuleToDecorate,
+                                     protocol_data_to_decorate = State0},
+     Message) ->
+    {State1, Result} = ProtocolModuleToDecorate:read(State0, Message),
+    {State#multiplexed_protocol{protocol_data_to_decorate = State1}, Result}.
diff --git a/lib/erl/src/thrift_processor.erl b/lib/erl/src/thrift_processor.erl
index d474294..5c9f26f 100644
--- a/lib/erl/src/thrift_processor.erl
+++ b/lib/erl/src/thrift_processor.erl
@@ -33,41 +33,53 @@
                            handler = Handler}).
 
 loop(State0 = #thrift_processor{protocol  = Proto0,
-                                handler = Handler}) ->
+                                handler = Handler,
+                                service = Service}) ->
+
     {Proto1, MessageBegin} = thrift_protocol:read(Proto0, message_begin),
     State1 = State0#thrift_processor{protocol = Proto1},
+
+    ErrorHandler = fun
+        (HandlerModules) when is_list(HandlerModules) -> thrift_multiplexed_map_wrapper:fetch(?MULTIPLEXED_ERROR_HANDLER_KEY, HandlerModules);
+        (HandlerModule) -> HandlerModule
+    end,
+
     case MessageBegin of
+
         #protocol_message_begin{name = Function,
-                                type = ?tMessageType_CALL,
-                                seqid = Seqid} ->
-            case handle_function(State1, list_to_atom(Function), Seqid) of
-                {State2, ok} -> loop(State2);
-                {_State2, {error, Reason}} ->
-                    Handler:handle_error(list_to_atom(Function), Reason),
-                    thrift_protocol:close_transport(Proto1),
-                    ok
-            end;
-        #protocol_message_begin{name = Function,
-                                type = ?tMessageType_ONEWAY, 
-                                seqid = Seqid} ->
-            case handle_function(State1, list_to_atom(Function), Seqid) of
-                {State2, ok} -> loop(State2);
-                {_State2, {error, Reason}} ->
-                    Handler:handle_error(list_to_atom(Function), Reason),
-                    thrift_protocol:close_transport(Proto1),
-                    ok
+                                type = Type,
+                                seqid = Seqid} when Type =:= ?tMessageType_CALL; Type =:= ?tMessageType_ONEWAY ->
+            case string:tokens(Function, ?MULTIPLEXED_SERVICE_SEPARATOR) of
+                [ServiceName, FunctionName] ->
+                    ServiceModule  = thrift_multiplexed_map_wrapper:fetch(ServiceName, Service),
+                    ServiceHandler = thrift_multiplexed_map_wrapper:fetch(ServiceName, Handler),
+                    case handle_function(State1#thrift_processor{service=ServiceModule, handler=ServiceHandler}, list_to_atom(FunctionName), Seqid) of
+                        {State2, ok} -> loop(State2#thrift_processor{service=Service, handler=Handler});
+                        {_State2, {error, Reason}} ->
+							apply(ErrorHandler(Handler), handle_error, [list_to_atom(Function), Reason]),
+                            thrift_protocol:close_transport(Proto1),
+                            ok
+                    end;
+                _ ->
+                    case handle_function(State1, list_to_atom(Function), Seqid) of
+                        {State2, ok} -> loop(State2);
+                        {_State2, {error, Reason}} ->
+							apply(ErrorHandler(Handler), handle_error, [list_to_atom(Function), Reason]),
+                            thrift_protocol:close_transport(Proto1),
+                            ok
+                    end
             end;
         {error, timeout = Reason} ->
-            Handler:handle_error(undefined, Reason),
+			apply(ErrorHandler(Handler), handle_error, [undefined, Reason]),
             thrift_protocol:close_transport(Proto1),
             ok;
         {error, closed = Reason} ->
             %% error_logger:info_msg("Client disconnected~n"),
-            Handler:handle_error(undefined, Reason),
+			apply(ErrorHandler(Handler), handle_error, [undefined, Reason]),
             thrift_protocol:close_transport(Proto1),
             exit(shutdown);
         {error, Reason} ->
-            Handler:handle_error(undefined, Reason),
+			apply(ErrorHandler(Handler), handle_error, [undefined, Reason]),
             thrift_protocol:close_transport(Proto1),
             exit(shutdown)
     end.
diff --git a/lib/erl/src/thrift_socket_server.erl b/lib/erl/src/thrift_socket_server.erl
index e9ad6f4..4e3c052 100644
--- a/lib/erl/src/thrift_socket_server.erl
+++ b/lib/erl/src/thrift_socket_server.erl
@@ -21,12 +21,19 @@
 
 -behaviour(gen_server).
 
--export([start/1, stop/1]).
+-include ("thrift_constants.hrl").
 
--export([init/1, handle_call/3, handle_cast/2, terminate/2, code_change/3,
-         handle_info/2]).
+-ifdef(TEST).
+    -compile(export_all).
+    -export_records([thrift_socket_server]).
+-else.
+    -export([start/1, stop/1]).
 
--export([acceptor_loop/1]).
+    -export([init/1, handle_call/3, handle_cast/2, terminate/2, code_change/3,
+             handle_info/2]).
+
+    -export([acceptor_loop/1]).
+-endif.
 
 -record(thrift_socket_server,
         {port,
@@ -94,10 +101,46 @@
     parse_options(Rest, State#thrift_socket_server{ip=ParsedIp});
 parse_options([{socket_opts, L} | Rest], State) when is_list(L), length(L) > 0 ->
     parse_options(Rest, State#thrift_socket_server{socket_opts=L});
-parse_options([{handler, Handler} | Rest], State) ->
+
+parse_options([{handler, []} | _Rest], _State) ->
+    throw("At least an error handler must be defined.");
+parse_options([{handler, ServiceHandlerPropertyList} | Rest], State) when is_list(ServiceHandlerPropertyList) ->
+    ServiceHandlerMap =
+    case State#thrift_socket_server.handler of
+        undefined ->
+            lists:foldl(
+                fun ({ServiceName, ServiceHandler}, Acc) when is_list(ServiceName), is_atom(ServiceHandler) ->
+                        thrift_multiplexed_map_wrapper:store(ServiceName, ServiceHandler, Acc);
+                    (_, _Acc) ->
+                        throw("The handler option is not properly configured for multiplexed services. It should be a kind of [{\"error_handler\", Module::atom()}, {SericeName::list(), Module::atom()}, ...]")
+                end, thrift_multiplexed_map_wrapper:new(), ServiceHandlerPropertyList);
+        _ -> throw("Error while parsing the handler option.")
+    end,
+    case thrift_multiplexed_map_wrapper:find(?MULTIPLEXED_ERROR_HANDLER_KEY, ServiceHandlerMap) of
+        {ok, _ErrorHandler} -> parse_options(Rest, State#thrift_socket_server{handler=ServiceHandlerMap});
+        error -> throw("The handler option is not properly configured for multiplexed services. It should be a kind of [{\"error_handler\", Module::atom()}, {SericeName::list(), Module::atom()}, ...]")
+    end;
+parse_options([{handler, Handler} | Rest], State) when State#thrift_socket_server.handler == undefined, is_atom(Handler) ->
     parse_options(Rest, State#thrift_socket_server{handler=Handler});
-parse_options([{service, Service} | Rest], State) ->
+
+parse_options([{service, []} | _Rest], _State) ->
+    throw("At least one service module must be defined.");
+parse_options([{service, ServiceModulePropertyList} | Rest], State) when is_list(ServiceModulePropertyList) ->
+    ServiceModuleMap =
+    case State#thrift_socket_server.service of
+        undefined ->
+            lists:foldl(
+                fun ({ServiceName, ServiceModule}, Acc) when is_list(ServiceName), is_atom(ServiceModule) ->
+                        thrift_multiplexed_map_wrapper:store(ServiceName, ServiceModule, Acc);
+                    (_, _Acc) ->
+                        throw("The service option is not properly configured for multiplexed services. It should be a kind of [{SericeName::list(), ServiceModule::atom()}, ...]")
+                end, thrift_multiplexed_map_wrapper:new(), ServiceModulePropertyList);
+        _ -> throw("Error while parsing the service option.")
+    end,
+    parse_options(Rest, State#thrift_socket_server{service=ServiceModuleMap});
+parse_options([{service, Service} | Rest], State) when State#thrift_socket_server.service == undefined, is_atom(Service) ->
     parse_options(Rest, State#thrift_socket_server{service=Service});
+
 parse_options([{max, Max} | Rest], State) ->
     MaxInt = case Max of
                  Max when is_list(Max) ->