THRIFT-2110 Erlang: Support for Multiplexing Services on any Transport, Protocol and Server
Client: Erlang
Patch: David Robakowski rebased by Nobuaki Sukegawa
Modification: Return value fix in thrift_client uncovered by added tests
diff --git a/lib/erl/src/thrift_processor.erl b/lib/erl/src/thrift_processor.erl
index d474294..5c9f26f 100644
--- a/lib/erl/src/thrift_processor.erl
+++ b/lib/erl/src/thrift_processor.erl
@@ -33,41 +33,53 @@
handler = Handler}).
loop(State0 = #thrift_processor{protocol = Proto0,
- handler = Handler}) ->
+ handler = Handler,
+ service = Service}) ->
+
{Proto1, MessageBegin} = thrift_protocol:read(Proto0, message_begin),
State1 = State0#thrift_processor{protocol = Proto1},
+
+ ErrorHandler = fun
+ (HandlerModules) when is_list(HandlerModules) -> thrift_multiplexed_map_wrapper:fetch(?MULTIPLEXED_ERROR_HANDLER_KEY, HandlerModules);
+ (HandlerModule) -> HandlerModule
+ end,
+
case MessageBegin of
+
#protocol_message_begin{name = Function,
- type = ?tMessageType_CALL,
- seqid = Seqid} ->
- case handle_function(State1, list_to_atom(Function), Seqid) of
- {State2, ok} -> loop(State2);
- {_State2, {error, Reason}} ->
- Handler:handle_error(list_to_atom(Function), Reason),
- thrift_protocol:close_transport(Proto1),
- ok
- end;
- #protocol_message_begin{name = Function,
- type = ?tMessageType_ONEWAY,
- seqid = Seqid} ->
- case handle_function(State1, list_to_atom(Function), Seqid) of
- {State2, ok} -> loop(State2);
- {_State2, {error, Reason}} ->
- Handler:handle_error(list_to_atom(Function), Reason),
- thrift_protocol:close_transport(Proto1),
- ok
+ type = Type,
+ seqid = Seqid} when Type =:= ?tMessageType_CALL; Type =:= ?tMessageType_ONEWAY ->
+ case string:tokens(Function, ?MULTIPLEXED_SERVICE_SEPARATOR) of
+ [ServiceName, FunctionName] ->
+ ServiceModule = thrift_multiplexed_map_wrapper:fetch(ServiceName, Service),
+ ServiceHandler = thrift_multiplexed_map_wrapper:fetch(ServiceName, Handler),
+ case handle_function(State1#thrift_processor{service=ServiceModule, handler=ServiceHandler}, list_to_atom(FunctionName), Seqid) of
+ {State2, ok} -> loop(State2#thrift_processor{service=Service, handler=Handler});
+ {_State2, {error, Reason}} ->
+ apply(ErrorHandler(Handler), handle_error, [list_to_atom(Function), Reason]),
+ thrift_protocol:close_transport(Proto1),
+ ok
+ end;
+ _ ->
+ case handle_function(State1, list_to_atom(Function), Seqid) of
+ {State2, ok} -> loop(State2);
+ {_State2, {error, Reason}} ->
+ apply(ErrorHandler(Handler), handle_error, [list_to_atom(Function), Reason]),
+ thrift_protocol:close_transport(Proto1),
+ ok
+ end
end;
{error, timeout = Reason} ->
- Handler:handle_error(undefined, Reason),
+ apply(ErrorHandler(Handler), handle_error, [undefined, Reason]),
thrift_protocol:close_transport(Proto1),
ok;
{error, closed = Reason} ->
%% error_logger:info_msg("Client disconnected~n"),
- Handler:handle_error(undefined, Reason),
+ apply(ErrorHandler(Handler), handle_error, [undefined, Reason]),
thrift_protocol:close_transport(Proto1),
exit(shutdown);
{error, Reason} ->
- Handler:handle_error(undefined, Reason),
+ apply(ErrorHandler(Handler), handle_error, [undefined, Reason]),
thrift_protocol:close_transport(Proto1),
exit(shutdown)
end.