erlang: Refactor the processor
Now the server works.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@990986 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/erl/src/thrift_processor.erl b/lib/erl/src/thrift_processor.erl
index b751da6..9924a2c 100644
--- a/lib/erl/src/thrift_processor.erl
+++ b/lib/erl/src/thrift_processor.erl
@@ -24,53 +24,53 @@
-include("thrift_constants.hrl").
-include("thrift_protocol.hrl").
--record(thrift_processor, {handler, in_protocol, out_protocol, service}).
+-record(thrift_processor, {handler, protocol, service}).
init({Server, ProtoGen, Service, Handler}) when is_function(ProtoGen, 0) ->
- {ok, IProt, OProt} = ProtoGen(),
- loop(#thrift_processor{in_protocol = IProt,
- out_protocol = OProt,
+ {ok, Proto} = ProtoGen(),
+ loop(#thrift_processor{protocol = Proto,
service = Service,
handler = Handler}).
-loop(State = #thrift_processor{in_protocol = IProto,
- out_protocol = OProto}) ->
- case thrift_protocol:read(IProto, message_begin) of
+loop(State0 = #thrift_processor{protocol = Proto0}) ->
+ {Proto1, MessageBegin} = thrift_protocol:read(Proto0, message_begin),
+ State1 = State0#thrift_processor{protocol = Proto1},
+ case MessageBegin of
#protocol_message_begin{name = Function,
type = ?tMessageType_CALL} ->
- ok = handle_function(State, list_to_atom(Function)),
- loop(State);
+ {State2, ok} = handle_function(State1, list_to_atom(Function)),
+ loop(State2);
#protocol_message_begin{name = Function,
type = ?tMessageType_ONEWAY} ->
- ok = handle_function(State, list_to_atom(Function)),
- loop(State);
+ {State2, ok} = handle_function(State1, list_to_atom(Function)),
+ loop(State2);
{error, timeout} ->
- thrift_protocol:close_transport(OProto),
+ thrift_protocol:close_transport(Proto1),
ok;
{error, closed} ->
%% error_logger:info_msg("Client disconnected~n"),
- thrift_protocol:close_transport(OProto),
+ thrift_protocol:close_transport(Proto1),
exit(shutdown)
end.
-handle_function(State=#thrift_processor{in_protocol = IProto,
- out_protocol = OProto,
- handler = Handler,
- service = Service},
+handle_function(State0=#thrift_processor{protocol = Proto0,
+ handler = Handler,
+ service = Service},
Function) ->
InParams = Service:function_info(Function, params_type),
- {ok, Params} = thrift_protocol:read(IProto, InParams),
+ {Proto1, {ok, Params}} = thrift_protocol:read(Proto0, InParams),
+ State1 = State0#thrift_processor{protocol = Proto1},
try
Result = Handler:handle_function(Function, Params),
%% {Micro, Result} = better_timer(Handler, handle_function, [Function, Params]),
%% error_logger:info_msg("Processed ~p(~p) in ~.4fms~n",
%% [Function, Params, Micro/1000.0]),
- handle_success(State, Function, Result)
+ handle_success(State1, Function, Result)
catch
Type:Data when Type =:= throw orelse Type =:= error ->
- handle_function_catch(State, Function, Type, Data)
+ handle_function_catch(State1, Function, Type, Data)
end.
handle_function_catch(State = #thrift_processor{service = Service},
@@ -83,39 +83,37 @@
error_logger:warning_msg(
"oneway void ~p threw error which must be ignored: ~p",
[Function, {ErrType, ErrData, Stack}]),
- ok;
+ {State, ok};
{throw, Exception} when is_tuple(Exception), size(Exception) > 0 ->
%error_logger:warning_msg("~p threw exception: ~p~n", [Function, Exception]),
- handle_exception(State, Function, Exception),
- ok; % we still want to accept more requests from this client
+ handle_exception(State, Function, Exception);
+ % we still want to accept more requests from this client
{error, Error} ->
- ok = handle_error(State, Function, Error)
+ handle_error(State, Function, Error)
end.
-handle_success(State = #thrift_processor{out_protocol = OProto,
- service = Service},
+handle_success(State = #thrift_processor{service = Service},
Function,
Result) ->
ReplyType = Service:function_info(Function, reply_type),
StructName = atom_to_list(Function) ++ "_result",
- ok = case Result of
- {reply, ReplyData} ->
- Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}},
- send_reply(OProto, Function, ?tMessageType_REPLY, Reply);
+ case Result of
+ {reply, ReplyData} ->
+ Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}},
+ send_reply(State, Function, ?tMessageType_REPLY, Reply);
- ok when ReplyType == {struct, []} ->
- send_reply(OProto, Function, ?tMessageType_REPLY, {ReplyType, {StructName}});
+ ok when ReplyType == {struct, []} ->
+ send_reply(State, Function, ?tMessageType_REPLY, {ReplyType, {StructName}});
- ok when ReplyType == oneway_void ->
- %% no reply for oneway void
- ok
- end.
+ ok when ReplyType == oneway_void ->
+ %% no reply for oneway void
+ {State, ok}
+ end.
-handle_exception(State = #thrift_processor{out_protocol = OProto,
- service = Service},
+handle_exception(State = #thrift_processor{service = Service},
Function,
Exception) ->
ExceptionType = element(1, Exception),
@@ -140,9 +138,9 @@
% Make sure we got at least one defined
case lists:all(fun(X) -> X =:= undefined end, ExceptionList) of
true ->
- ok = handle_unknown_exception(State, Function, Exception);
+ handle_unknown_exception(State, Function, Exception);
false ->
- ok = send_reply(OProto, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple})
+ send_reply(State, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple})
end.
%%
@@ -153,7 +151,7 @@
handle_error(State, Function, {exception_not_declared_as_thrown,
Exception}).
-handle_error(#thrift_processor{out_protocol = OProto}, Function, Error) ->
+handle_error(State, Function, Error) ->
Stack = erlang:get_stacktrace(),
error_logger:error_msg("~p had an error: ~p~n", [Function, {Error, Stack}]),
@@ -169,14 +167,14 @@
#'TApplicationException'{
message = Message,
type = ?TApplicationException_UNKNOWN}},
- send_reply(OProto, Function, ?tMessageType_EXCEPTION, Reply).
+ send_reply(State, Function, ?tMessageType_EXCEPTION, Reply).
-send_reply(OProto, Function, ReplyMessageType, Reply) ->
- ok = thrift_protocol:write(OProto, #protocol_message_begin{
- name = atom_to_list(Function),
- type = ReplyMessageType,
- seqid = 0}),
- ok = thrift_protocol:write(OProto, Reply),
- ok = thrift_protocol:write(OProto, message_end),
- ok = thrift_protocol:flush_transport(OProto),
- ok.
+send_reply(State = #thrift_processor{protocol = Proto0}, Function, ReplyMessageType, Reply) ->
+ {Proto1, ok} = thrift_protocol:write(Proto0, #protocol_message_begin{
+ name = atom_to_list(Function),
+ type = ReplyMessageType,
+ seqid = 0}),
+ {Proto2, ok} = thrift_protocol:write(Proto1, Reply),
+ {Proto3, ok} = thrift_protocol:write(Proto2, message_end),
+ {Proto4, ok} = thrift_protocol:flush_transport(Proto3),
+ {State#thrift_processor{protocol = Proto4}, ok}.
diff --git a/lib/erl/src/thrift_server.erl b/lib/erl/src/thrift_server.erl
index 5d0012b..80a1388 100644
--- a/lib/erl/src/thrift_server.erl
+++ b/lib/erl/src/thrift_server.erl
@@ -177,7 +177,7 @@
{ok, SocketTransport} = thrift_socket_transport:new(Socket),
{ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport),
{ok, Protocol} = thrift_binary_protocol:new(BufferedTransport),
- {ok, Protocol, Protocol}
+ {ok, Protocol}
end,
spawn(thrift_processor, init, [{Server, ProtoGen, Service, Handler}]).
diff --git a/lib/erl/src/thrift_socket_server.erl b/lib/erl/src/thrift_socket_server.erl
index 6794e63..44894b0 100644
--- a/lib/erl/src/thrift_socket_server.erl
+++ b/lib/erl/src/thrift_socket_server.erl
@@ -188,7 +188,7 @@
false -> thrift_buffered_transport:new(SocketTransport)
end,
{ok, Protocol} = thrift_binary_protocol:new(Transport),
- {ok, IProt=Protocol, OProt=Protocol}
+ {ok, Protocol}
end,
thrift_processor:init({Server, ProtoGen, Service, Handler});
{error, closed} ->