Rollback a few recent Erlang changes to fix blame data
My combined patch for THRIFT-599 was committed, but it is preferable
commit the individual patches to preserve the more detailed log and
blame data. I'll recommit r987018 as a sequence of patches and r988722
as its own rev.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@990957 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/erl/README b/lib/erl/README
index 667c549..ddb6946 100644
--- a/lib/erl/README
+++ b/lib/erl/README
@@ -25,19 +25,32 @@
Example session using thrift_client:
-1> {ok, C0} = thrift_client_util:new("localhost", 9090, thriftTest_thrift, []), ok.
-ok
-2> {C1, R1} = thrift_client:call(C0, testVoid, []), R1.
+118> f(), {ok, C} = thrift_client:start_link("localhost", 9090, thriftTest_thrif
+t).
+{ok,<0.271.0>}
+119> thrift_client:call(C, testVoid, []).
{ok,ok}
-3> {C2, R2} = thrift_client:call(C1, testVoid, [asdf]), R2.
+120> thrift_client:call(C, testVoid, [asdf]).
{error,{bad_args,testVoid,[asdf]}}
-4> {C3, R3} = thrift_client:call(C2, testI32, [123]), R3.
+121> thrift_client:call(C, testI32, [123]).
{ok,123}
-5> {C4, R4} = thrift_client:call(C3, testOneway, [1]), R4.
+122> thrift_client:call(C, testOneway, [1]).
{ok,ok}
-6> {C5, R5} = thrift_client:call(C4, testXception, ["foo"]), R5.
+123> catch thrift_client:call(C, testXception, ["foo"]).
{error,{no_function,testXception}}
-7> {C6, R6} = thrift_client:call(C5, testException, ["foo"]), R6.
+124> catch thrift_client:call(C, testException, ["foo"]).
{ok,ok}
-8> {C7, R7} = (catch thrift_client:call(C6, testException, ["Xception"])), R7.
-{exception,{xception,1001,<<"Xception">>}}
+125> catch thrift_client:call(C, testException, ["Xception"]).
+{xception,1001,"This is an Xception"}
+126> thrift_client:call(C, testException, ["Xception"]).
+
+=ERROR REPORT==== 24-Feb-2008::23:00:23 ===
+Error in process <0.269.0> with exit value: {{nocatch,{xception,1001,"This is an
+ Xception"}},[{thrift_client,call,3},{erl_eval,do_apply,5},{shell,exprs,6},{shel
+l,eval_loop,3}]}
+
+** exited: {{nocatch,{xception,1001,"This is an Xception"}},
+ [{thrift_client,call,3},
+ {erl_eval,do_apply,5},
+ {shell,exprs,6},
+ {shell,eval_loop,3}]} **
diff --git a/lib/erl/build/otp.mk b/lib/erl/build/otp.mk
index 0e0381e..1d16e2c 100644
--- a/lib/erl/build/otp.mk
+++ b/lib/erl/build/otp.mk
@@ -25,6 +25,7 @@
# MHOST is the host where this Makefile runs.
MHOST=${shell hostname -s}
+ERL_COMPILE_FLAGS+=-W0
# The location of the erlang runtime system.
ifndef ERL_RUN_TOP
diff --git a/lib/erl/include/thrift_protocol.hrl b/lib/erl/include/thrift_protocol.hrl
index f85f455..f4e1901 100644
--- a/lib/erl/include/thrift_protocol.hrl
+++ b/lib/erl/include/thrift_protocol.hrl
@@ -18,7 +18,7 @@
%%
-ifndef(THRIFT_PROTOCOL_INCLUDED).
--define(THRIFT_PROTOCOL_INCLUDED, true).
+-define(THRIFT_PROTOCOL_INCLUDED, yea).
-record(protocol_message_begin, {name, type, seqid}).
-record(protocol_struct_begin, {name}).
@@ -27,40 +27,5 @@
-record(protocol_list_begin, {etype, size}).
-record(protocol_set_begin, {etype, size}).
--type tprot_header_val() :: #protocol_message_begin{}
- | #protocol_struct_begin{}
- | #protocol_field_begin{}
- | #protocol_map_begin{}
- | #protocol_list_begin{}
- | #protocol_set_begin{}
- .
--type tprot_empty_tag() :: message_end
- | struct_begin
- | struct_end
- | field_end
- | map_end
- | list_end
- | set_end
- .
--type tprot_header_tag() :: message_begin
- | field_begin
- | map_begin
- | list_begin
- | set_begin
- .
--type tprot_data_tag() :: ui32
- | bool
- | byte
- | i16
- | i32
- | i64
- | double
- | string
- .
--type tprot_cont_tag() :: {list, _Type}
- | {map, _KType, _VType}
- | {set, _Type}
- .
-
-endif.
diff --git a/lib/erl/include/thrift_protocol_behaviour.hrl b/lib/erl/include/thrift_protocol_behaviour.hrl
deleted file mode 100644
index abe300b..0000000
--- a/lib/erl/include/thrift_protocol_behaviour.hrl
+++ /dev/null
@@ -1,37 +0,0 @@
-%%
-%% Licensed to the Apache Software Foundation (ASF) under one
-%% or more contributor license agreements. See the NOTICE file
-%% distributed with this work for additional information
-%% regarding copyright ownership. The ASF licenses this file
-%% to you under the Apache License, Version 2.0 (the
-%% "License"); you may not use this file except in compliance
-%% with the License. You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied. See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-
-%% Signature specifications for protocol implementations.
-
--ifndef(THRIFT_PROTOCOL_BEHAVIOUR_INCLUDED).
--define(THRIFT_PROTOCOL_BEHAVIOUR_INCLUDED, true).
-
--spec flush_transport(state()) -> {state(), ok | {error, _Reason}}.
--spec close_transport(state()) -> {state(), ok | {error, _Reason}}.
-
--spec write(state(), any()) -> {state(), ok | {error, _Reason}}.
-
-%% NOTE: Keep this in sync with thrift_protocol:read and read_specific.
--spec read
- (state(), tprot_empty_tag()) -> {state(), ok | {error, _Reason}};
- (state(), tprot_header_tag()) -> {state(), tprot_header_val() | {error, _Reason}};
- (state(), tprot_data_tag()) -> {state(), {ok, any()} | {error, _Reason}}.
-
-
--endif.
diff --git a/lib/erl/src/Makefile b/lib/erl/src/Makefile
index 78af14f..980af81 100644
--- a/lib/erl/src/Makefile
+++ b/lib/erl/src/Makefile
@@ -27,7 +27,6 @@
MODULES = $(shell find . -name \*.erl | sed 's:^\./::' | sed 's/\.erl//')
MODULES_STRING_LIST = $(shell find . -name \*.erl | sed 's:^\./:":' | sed 's/\.erl/",/')
-BEHAV_MODULES = $(shell find . -name \*.erl | xargs grep -l behaviour_info | sed 's:^\./::' | sed 's/\.erl//')
HRL_FILES=
INTERNAL_HRL_FILES= $(APP_NAME).hrl
@@ -44,8 +43,7 @@
APPUP_TARGET= $(EBIN)/$(APPUP_FILE)
BEAMS= $(MODULES:%=$(EBIN)/%.$(EMULATOR))
-BEHAV_BEAMS= $(BEHAV_MODULES:%=$(EBIN)/%.$(EMULATOR))
-TARGET_FILES= $(BEHAV_BEAMS) $(BEAMS) $(APP_TARGET) $(APPUP_TARGET)
+TARGET_FILES= $(BEAMS) $(APP_TARGET) $(APPUP_TARGET)
WEB_TARGET=/var/yaws/www/$(APP_NAME)
@@ -55,8 +53,7 @@
ERL_FLAGS +=
ERL_INCLUDE = -I../include -I../../fslib/include -I../../system_status/include
-ERL_BEHAV_PATH = -pz ../ebin
-ERL_COMPILE_FLAGS += $(ERL_INCLUDE) $(ERL_BEHAV_PATH)
+ERL_COMPILE_FLAGS += $(ERL_INCLUDE)
# ----------------------------------------------------
# Targets
diff --git a/lib/erl/include/thrift_transport_behaviour.hrl b/lib/erl/src/test_handler.erl
similarity index 63%
rename from lib/erl/include/thrift_transport_behaviour.hrl
rename to lib/erl/src/test_handler.erl
index dbc05aa..28a3acd 100644
--- a/lib/erl/include/thrift_transport_behaviour.hrl
+++ b/lib/erl/src/test_handler.erl
@@ -17,15 +17,10 @@
%% under the License.
%%
-%% Signature specifications for transport implementations.
+-module(test_handler).
--ifndef(THRIFT_TRANSPORT_BEHAVIOUR_INCLUDED).
--define(THRIFT_TRANSPORT_BEHAVIOUR_INCLUDED, true).
+-export([handle_function/2]).
--spec write(state(), iolist() | binary()) -> {state(), ok | {error, _Reason}}.
--spec read(state(), non_neg_integer()) -> {state(), {ok, binary()} | {error, _Reason}}.
--spec flush(state()) -> {state(), ok | {error, _Reason}}.
--spec close(state()) -> {state(), ok | {error, _Reason}}.
-
-
--endif.
+handle_function(add, Params = {A, B}) ->
+ io:format("Got params: ~p~n", [Params]),
+ {reply, A + B}.
diff --git a/lib/erl/include/thrift_transport_behaviour.hrl b/lib/erl/src/test_service.erl
similarity index 63%
copy from lib/erl/include/thrift_transport_behaviour.hrl
copy to lib/erl/src/test_service.erl
index dbc05aa..7aa4827 100644
--- a/lib/erl/include/thrift_transport_behaviour.hrl
+++ b/lib/erl/src/test_service.erl
@@ -17,15 +17,13 @@
%% under the License.
%%
-%% Signature specifications for transport implementations.
+-module(test_service).
+%
+% Test service definition
--ifndef(THRIFT_TRANSPORT_BEHAVIOUR_INCLUDED).
--define(THRIFT_TRANSPORT_BEHAVIOUR_INCLUDED, true).
+-export([function_info/2]).
--spec write(state(), iolist() | binary()) -> {state(), ok | {error, _Reason}}.
--spec read(state(), non_neg_integer()) -> {state(), {ok, binary()} | {error, _Reason}}.
--spec flush(state()) -> {state(), ok | {error, _Reason}}.
--spec close(state()) -> {state(), ok | {error, _Reason}}.
-
-
--endif.
+function_info(add, params_type) ->
+ {struct, [{1, i32},
+ {2, i32}]};
+function_info(add, reply_type) -> i32.
diff --git a/lib/erl/src/thrift_base64_transport.erl b/lib/erl/src/thrift_base64_transport.erl
index d31f2ba..9d13151 100644
--- a/lib/erl/src/thrift_base64_transport.erl
+++ b/lib/erl/src/thrift_base64_transport.erl
@@ -29,35 +29,30 @@
%% State
-record(b64_transport, {wrapped}).
--type state() :: #b64_transport{}.
--include("thrift_transport_behaviour.hrl").
new(Wrapped) ->
State = #b64_transport{wrapped = Wrapped},
thrift_transport:new(?MODULE, State).
-write(This = #b64_transport{wrapped = Wrapped}, Data) ->
- {NewWrapped, Result} = thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))),
- {This#b64_transport{wrapped = NewWrapped}, Result}.
+write(#b64_transport{wrapped = Wrapped}, Data) ->
+ thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))).
%% base64 doesn't support reading quite yet since it would involve
%% nasty buffering and such
-read(This = #b64_transport{}, _Data) ->
- {This, {error, no_reads_allowed}}.
+read(#b64_transport{wrapped = Wrapped}, Data) ->
+ {error, no_reads_allowed}.
-flush(This = #b64_transport{wrapped = Wrapped0}) ->
- {Wrapped1, ok} = thrift_transport:write(Wrapped0, <<"\n">>),
- {Wrapped2, ok} = thrift_transport:flush(Wrapped1),
- {This#b64_transport{wrapped = Wrapped2}, ok}.
+flush(#b64_transport{wrapped = Wrapped}) ->
+ thrift_transport:write(Wrapped, <<"\n">>),
+ thrift_transport:flush(Wrapped).
-close(This0) ->
- {This1 = #b64_transport{wrapped = Wrapped}, ok} = flush(This0),
- {NewWrapped, ok} = thrift_transport:close(Wrapped),
- {This1#b64_transport{wrapped = NewWrapped}, ok}.
+close(Me = #b64_transport{wrapped = Wrapped}) ->
+ flush(Me),
+ thrift_transport:close(Wrapped).
%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
diff --git a/lib/erl/src/thrift_binary_protocol.erl b/lib/erl/src/thrift_binary_protocol.erl
index 800fd8e..ad53384 100644
--- a/lib/erl/src/thrift_binary_protocol.erl
+++ b/lib/erl/src/thrift_binary_protocol.erl
@@ -19,7 +19,7 @@
-module(thrift_binary_protocol).
--behaviour(thrift_protocol).
+-behavior(thrift_protocol).
-include("thrift_constants.hrl").
-include("thrift_protocol.hrl").
@@ -37,8 +37,6 @@
strict_read=true,
strict_write=true
}).
--type state() :: #binary_protocol{}.
--include("thrift_protocol_behaviour.hrl").
-define(VERSION_MASK, 16#FFFF0000).
-define(VERSION_1, 16#80010000).
@@ -60,81 +58,79 @@
parse_options(Rest, State#binary_protocol{strict_write=Bool}).
-flush_transport(This = #binary_protocol{transport = Transport}) ->
- {NewTransport, Result} = thrift_transport:flush(Transport),
- {This#binary_protocol{transport = NewTransport}, Result}.
+flush_transport(#binary_protocol{transport = Transport}) ->
+ thrift_transport:flush(Transport).
-close_transport(This = #binary_protocol{transport = Transport}) ->
- {NewTransport, Result} = thrift_transport:close(Transport),
- {This#binary_protocol{transport = NewTransport}, Result}.
+close_transport(#binary_protocol{transport = Transport}) ->
+ thrift_transport:close(Transport).
%%%
%%% instance methods
%%%
-write(This0, #protocol_message_begin{
+write(This, #protocol_message_begin{
name = Name,
type = Type,
seqid = Seqid}) ->
- case This0#binary_protocol.strict_write of
+ case This#binary_protocol.strict_write of
true ->
- {This1, ok} = write(This0, {i32, ?VERSION_1 bor Type}),
- {This2, ok} = write(This1, {string, Name}),
- {This3, ok} = write(This2, {i32, Seqid}),
- {This3, ok};
+ write(This, {i32, ?VERSION_1 bor Type}),
+ write(This, {string, Name}),
+ write(This, {i32, Seqid});
false ->
- {This1, ok} = write(This0, {string, Name}),
- {This2, ok} = write(This1, {byte, Type}),
- {This3, ok} = write(This2, {i32, Seqid}),
- {This3, ok}
- end;
+ write(This, {string, Name}),
+ write(This, {byte, Type}),
+ write(This, {i32, Seqid})
+ end,
+ ok;
-write(This, message_end) -> {This, ok};
+write(This, message_end) -> ok;
-write(This0, #protocol_field_begin{
+write(This, #protocol_field_begin{
name = _Name,
type = Type,
id = Id}) ->
- {This1, ok} = write(This0, {byte, Type}),
- {This2, ok} = write(This1, {i16, Id}),
- {This2, ok};
+ write(This, {byte, Type}),
+ write(This, {i16, Id}),
+ ok;
write(This, field_stop) ->
- write(This, {byte, ?tType_STOP});
+ write(This, {byte, ?tType_STOP}),
+ ok;
-write(This, field_end) -> {This, ok};
+write(This, field_end) -> ok;
-write(This0, #protocol_map_begin{
+write(This, #protocol_map_begin{
ktype = Ktype,
vtype = Vtype,
size = Size}) ->
- {This1, ok} = write(This0, {byte, Ktype}),
- {This2, ok} = write(This1, {byte, Vtype}),
- {This3, ok} = write(This2, {i32, Size}),
- {This3, ok};
+ write(This, {byte, Ktype}),
+ write(This, {byte, Vtype}),
+ write(This, {i32, Size}),
+ ok;
-write(This, map_end) -> {This, ok};
+write(This, map_end) -> ok;
-write(This0, #protocol_list_begin{
+write(This, #protocol_list_begin{
etype = Etype,
size = Size}) ->
- {This1, ok} = write(This0, {byte, Etype}),
- {This2, ok} = write(This1, {i32, Size}),
- {This2, ok};
+ write(This, {byte, Etype}),
+ write(This, {i32, Size}),
+ ok;
-write(This, list_end) -> {This, ok};
+write(This, list_end) -> ok;
-write(This0, #protocol_set_begin{
+write(This, #protocol_set_begin{
etype = Etype,
size = Size}) ->
- {This1, ok} = write(This0, {byte, Etype}),
- {This2, ok} = write(This1, {i32, Size}),
- {This2, ok};
+ write(This, {byte, Etype}),
+ write(This, {i32, Size}),
+ ok;
-write(This, set_end) -> {This, ok};
+write(This, set_end) -> ok;
-write(This, #protocol_struct_begin{}) -> {This, ok};
-write(This, struct_end) -> {This, ok};
+write(This, #protocol_struct_begin{}) -> ok;
+write(This, struct_end) -> ok;
write(This, {bool, true}) -> write(This, {byte, 1});
write(This, {bool, false}) -> write(This, {byte, 0});
@@ -154,166 +150,152 @@
write(This, {double, Double}) ->
write(This, <<Double:64/big-signed-float>>);
-write(This0, {string, Str}) when is_list(Str) ->
- {This1, ok} = write(This0, {i32, length(Str)}),
- {This2, ok} = write(This1, list_to_binary(Str)),
- {This2, ok};
+write(This, {string, Str}) when is_list(Str) ->
+ write(This, {i32, length(Str)}),
+ write(This, list_to_binary(Str));
-write(This0, {string, Bin}) when is_binary(Bin) ->
- {This1, ok} = write(This0, {i32, size(Bin)}),
- {This2, ok} = write(This1, Bin),
- {This2, ok};
+write(This, {string, Bin}) when is_binary(Bin) ->
+ write(This, {i32, size(Bin)}),
+ write(This, Bin);
%% Data :: iolist()
-write(This = #binary_protocol{transport = Trans}, Data) ->
- {NewTransport, Result} = thrift_transport:write(Trans, Data),
- {This#binary_protocol{transport = NewTransport}, Result}.
+write(This, Data) ->
+ thrift_transport:write(This#binary_protocol.transport, Data).
%%
-read(This0, message_begin) ->
- {This1, Initial} = read(This0, ui32),
- case Initial of
+read(This, message_begin) ->
+ case read(This, ui32) of
{ok, Sz} when Sz band ?VERSION_MASK =:= ?VERSION_1 ->
%% we're at version 1
- {This2, {ok, Name}} = read(This1, string),
- {This3, {ok, SeqId}} = read(This2, i32),
- Type = Sz band ?TYPE_MASK,
- {This3, #protocol_message_begin{name = binary_to_list(Name),
- type = Type,
- seqid = SeqId}};
+ {ok, Name} = read(This, string),
+ Type = Sz band ?TYPE_MASK,
+ {ok, SeqId} = read(This, i32),
+ #protocol_message_begin{name = binary_to_list(Name),
+ type = Type,
+ seqid = SeqId};
{ok, Sz} when Sz < 0 ->
%% there's a version number but it's unexpected
- {This1, {error, {bad_binary_protocol_version, Sz}}};
+ {error, {bad_binary_protocol_version, Sz}};
- {ok, _Sz} when This1#binary_protocol.strict_read =:= true ->
+ {ok, Sz} when This#binary_protocol.strict_read =:= true ->
%% strict_read is true and there's no version header; that's an error
- {This1, {error, no_binary_protocol_version}};
+ {error, no_binary_protocol_version};
- {ok, Sz} when This1#binary_protocol.strict_read =:= false ->
+ {ok, Sz} when This#binary_protocol.strict_read =:= false ->
%% strict_read is false, so just read the old way
- {This2, {ok, Name}} = read_data(This1, Sz),
- {This3, {ok, Type}} = read(This2, byte),
- {This4, {ok, SeqId}} = read(This3, i32),
- {This4, #protocol_message_begin{name = binary_to_list(Name),
- type = Type,
- seqid = SeqId}};
+ {ok, Name} = read(This, Sz),
+ {ok, Type} = read(This, byte),
+ {ok, SeqId} = read(This, i32),
+ #protocol_message_begin{name = binary_to_list(Name),
+ type = Type,
+ seqid = SeqId};
- Else ->
- {This1, Else}
+ Err = {error, closed} -> Err;
+ Err = {error, timeout}-> Err;
+ Err = {error, ebadf} -> Err
end;
-read(This, message_end) -> {This, ok};
+read(This, message_end) -> ok;
-read(This, struct_begin) -> {This, ok};
-read(This, struct_end) -> {This, ok};
+read(This, struct_begin) -> ok;
+read(This, struct_end) -> ok;
-read(This0, field_begin) ->
- {This1, Result} = read(This0, byte),
- case Result of
+read(This, field_begin) ->
+ case read(This, byte) of
{ok, Type = ?tType_STOP} ->
- {This1, #protocol_field_begin{type = Type}};
+ #protocol_field_begin{type = Type};
{ok, Type} ->
- {This2, {ok, Id}} = read(This1, i16),
- {This2, #protocol_field_begin{type = Type,
- id = Id}}
+ {ok, Id} = read(This, i16),
+ #protocol_field_begin{type = Type,
+ id = Id}
end;
-read(This, field_end) -> {This, ok};
+read(This, field_end) -> ok;
-read(This0, map_begin) ->
- {This1, {ok, Ktype}} = read(This0, byte),
- {This2, {ok, Vtype}} = read(This1, byte),
- {This3, {ok, Size}} = read(This2, i32),
- {This3, #protocol_map_begin{ktype = Ktype,
- vtype = Vtype,
- size = Size}};
-read(This, map_end) -> {This, ok};
+read(This, map_begin) ->
+ {ok, Ktype} = read(This, byte),
+ {ok, Vtype} = read(This, byte),
+ {ok, Size} = read(This, i32),
+ #protocol_map_begin{ktype = Ktype,
+ vtype = Vtype,
+ size = Size};
+read(This, map_end) -> ok;
-read(This0, list_begin) ->
- {This1, {ok, Etype}} = read(This0, byte),
- {This2, {ok, Size}} = read(This1, i32),
- {This2, #protocol_list_begin{etype = Etype,
- size = Size}};
-read(This, list_end) -> {This, ok};
+read(This, list_begin) ->
+ {ok, Etype} = read(This, byte),
+ {ok, Size} = read(This, i32),
+ #protocol_list_begin{etype = Etype,
+ size = Size};
+read(This, list_end) -> ok;
-read(This0, set_begin) ->
- {This1, {ok, Etype}} = read(This0, byte),
- {This2, {ok, Size}} = read(This1, i32),
- {This2, #protocol_set_begin{etype = Etype,
- size = Size}};
-read(This, set_end) -> {This, ok};
+read(This, set_begin) ->
+ {ok, Etype} = read(This, byte),
+ {ok, Size} = read(This, i32),
+ #protocol_set_begin{etype = Etype,
+ size = Size};
+read(This, set_end) -> ok;
-read(This0, field_stop) ->
- {This1, {ok, ?tType_STOP}} = read(This0, byte),
- {This1, ok};
+read(This, field_stop) ->
+ {ok, ?tType_STOP} = read(This, byte),
+ ok;
%%
-read(This0, bool) ->
- {This1, Result} = read(This0, byte),
- case Result of
- {ok, Byte} -> {This1, {ok, Byte /= 0}};
- Else -> {This1, Else}
+read(This, bool) ->
+ case read(This, byte) of
+ {ok, Byte} -> {ok, Byte /= 0};
+ Else -> Else
end;
-read(This0, byte) ->
- {This1, Bytes} = read_data(This0, 1),
- case Bytes of
- {ok, <<Val:8/integer-signed-big, _/binary>>} -> {This1, {ok, Val}};
- Else -> {This1, Else}
+read(This, byte) ->
+ case read(This, 1) of
+ {ok, <<Val:8/integer-signed-big, _/binary>>} -> {ok, Val};
+ Else -> Else
end;
-read(This0, i16) ->
- {This1, Bytes} = read_data(This0, 2),
- case Bytes of
- {ok, <<Val:16/integer-signed-big, _/binary>>} -> {This1, {ok, Val}};
- Else -> {This1, Else}
+read(This, i16) ->
+ case read(This, 2) of
+ {ok, <<Val:16/integer-signed-big, _/binary>>} -> {ok, Val};
+ Else -> Else
end;
-read(This0, i32) ->
- {This1, Bytes} = read_data(This0, 4),
- case Bytes of
- {ok, <<Val:32/integer-signed-big, _/binary>>} -> {This1, {ok, Val}};
- Else -> {This1, Else}
+read(This, i32) ->
+ case read(This, 4) of
+ {ok, <<Val:32/integer-signed-big, _/binary>>} -> {ok, Val};
+ Else -> Else
end;
%% unsigned ints aren't used by thrift itself, but it's used for the parsing
%% of the packet version header. Without this special function BEAM works fine
%% but hipe thinks it received a bad version header.
-read(This0, ui32) ->
- {This1, Bytes} = read_data(This0, 4),
- case Bytes of
- {ok, <<Val:32/integer-unsigned-big, _/binary>>} -> {This1, {ok, Val}};
- Else -> {This1, Else}
+read(This, ui32) ->
+ case read(This, 4) of
+ {ok, <<Val:32/integer-unsigned-big, _/binary>>} -> {ok, Val};
+ Else -> Else
end;
-read(This0, i64) ->
- {This1, Bytes} = read_data(This0, 8),
- case Bytes of
- {ok, <<Val:64/integer-signed-big, _/binary>>} -> {This1, {ok, Val}};
- Else -> {This1, Else}
+read(This, i64) ->
+ case read(This, 8) of
+ {ok, <<Val:64/integer-signed-big, _/binary>>} -> {ok, Val};
+ Else -> Else
end;
-read(This0, double) ->
- {This1, Bytes} = read_data(This0, 8),
- case Bytes of
- {ok, <<Val:64/float-signed-big, _/binary>>} -> {This1, {ok, Val}};
- Else -> {This1, Else}
+read(This, double) ->
+ case read(This, 8) of
+ {ok, <<Val:64/float-signed-big, _/binary>>} -> {ok, Val};
+ Else -> Else
end;
% returns a binary directly, call binary_to_list if necessary
-read(This0, string) ->
- {This1, {ok, Sz}} = read(This0, i32),
- read_data(This1, Sz).
+read(This, string) ->
+ {ok, Sz} = read(This, i32),
+ {ok, Bin} = read(This, Sz);
--spec read_data(#binary_protocol{}, non_neg_integer()) ->
- {#binary_protocol{}, {ok, binary()} | {error, _Reason}}.
-read_data(This, 0) -> {This, {ok, <<>>}};
-read_data(This = #binary_protocol{transport = Trans}, Len) when is_integer(Len) andalso Len > 0 ->
- {NewTransport, Result} = thrift_transport:read(Trans, Len),
- {This#binary_protocol{transport = NewTransport}, Result}.
+read(This, 0) -> {ok, <<>>};
+read(This, Len) when is_integer(Len), Len >= 0 ->
+ thrift_transport:read(This#binary_protocol.transport, Len).
%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
diff --git a/lib/erl/src/thrift_buffered_transport.erl b/lib/erl/src/thrift_buffered_transport.erl
index d4d614e..ebc16bd 100644
--- a/lib/erl/src/thrift_buffered_transport.erl
+++ b/lib/erl/src/thrift_buffered_transport.erl
@@ -19,51 +19,154 @@
-module(thrift_buffered_transport).
+-behaviour(gen_server).
-behaviour(thrift_transport).
%% API
-export([new/1, new_transport_factory/1]).
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
%% thrift_transport callbacks
-export([write/2, read/2, flush/1, close/1]).
-record(buffered_transport, {wrapped, % a thrift_transport
write_buffer % iolist()
}).
--type state() :: #buffered_transport{}.
--include("thrift_transport_behaviour.hrl").
-
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
new(WrappedTransport) ->
- State = #buffered_transport{wrapped = WrappedTransport,
- write_buffer = []},
- thrift_transport:new(?MODULE, State).
+ case gen_server:start_link(?MODULE, [WrappedTransport], []) of
+ {ok, Pid} ->
+ thrift_transport:new(?MODULE, Pid);
+ Else ->
+ Else
+ end.
-%% Writes data into the buffer
-write(State = #buffered_transport{write_buffer = WBuf}, Data) ->
- {State#buffered_transport{write_buffer = [WBuf, Data]}, ok}.
-%% Flushes the buffer through to the wrapped transport
-flush(State = #buffered_transport{write_buffer = WBuf,
- wrapped = Wrapped0}) ->
- {Wrapped1, Response} = thrift_transport:write(Wrapped0, WBuf),
- {Wrapped2, _} = thrift_transport:flush(Wrapped1),
- NewState = State#buffered_transport{write_buffer = [],
- wrapped = Wrapped2},
- {NewState, Response}.
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+ gen_server:call(Transport, {write, Data}).
-%% Closes the transport and the wrapped transport
-close(State = #buffered_transport{wrapped = Wrapped0}) ->
- {Wrapped1, Result} = thrift_transport:close(Wrapped0),
- NewState = State#buffered_transport{wrapped = Wrapped1},
- {NewState, Result}.
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+ gen_server:call(Transport, flush).
-%% Reads data through from the wrapped transport
-read(State = #buffered_transport{wrapped = Wrapped0}, Len) when is_integer(Len) ->
- {Wrapped1, Response} = thrift_transport:read(Wrapped0, Len),
- NewState = State#buffered_transport{wrapped = Wrapped1},
- {NewState, Response}.
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport and the wrapped transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+ gen_server:cast(Transport, close).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+ gen_server:call(Transport, {read, Len}, _Timeout=10000).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Wrapped]) ->
+ {ok, #buffered_transport{wrapped = Wrapped,
+ write_buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({write, Data}, _From, State = #buffered_transport{write_buffer = WBuf}) ->
+ {reply, ok, State#buffered_transport{write_buffer = [WBuf, Data]}};
+
+handle_call({read, Len}, _From, State = #buffered_transport{wrapped = Wrapped}) ->
+ Response = thrift_transport:read(Wrapped, Len),
+ {reply, Response, State};
+
+handle_call(flush, _From, State = #buffered_transport{write_buffer = WBuf,
+ wrapped = Wrapped}) ->
+ Response = thrift_transport:write(Wrapped, WBuf),
+ thrift_transport:flush(Wrapped),
+ {reply, Response, State#buffered_transport{write_buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(close, State = #buffered_transport{write_buffer = WBuf,
+ wrapped = Wrapped}) ->
+ thrift_transport:write(Wrapped, WBuf),
+ %% Wrapped is closed by terminate/2
+ %% error_logger:info_msg("thrift_buffered_transport ~p: closing", [self()]),
+ {stop, normal, State};
+handle_cast(Msg, State=#buffered_transport{}) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, State = #buffered_transport{wrapped=Wrapped}) ->
+ thrift_transport:close(Wrapped),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
diff --git a/lib/erl/src/thrift_client.erl b/lib/erl/src/thrift_client.erl
index 5c74adc..d5bb146 100644
--- a/lib/erl/src/thrift_client.erl
+++ b/lib/erl/src/thrift_client.erl
@@ -19,127 +19,366 @@
-module(thrift_client).
+-behaviour(gen_server).
+
%% API
--export([new/2, call/3, send_call/3, close/1]).
+-export([start_link/2, start_link/3, start_link/4,
+ start/3, start/4,
+ call/3, send_call/3, close/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
-include("thrift_constants.hrl").
-include("thrift_protocol.hrl").
--record(tclient, {service, protocol, seqid}).
+-record(state, {service, protocol, seqid}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server as a linked process.
+%%--------------------------------------------------------------------
+start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) ->
+ start_link(Host, Port, Service, []).
+
+start_link(Host, Port, Service, Options) ->
+ start(Host, Port, Service, [{monitor, link} | Options]).
+
+start_link(ProtocolFactory, Service) ->
+ start(ProtocolFactory, Service, [{monitor, link}]).
+
+%%
+%% Splits client options into protocol options and transport options
+%%
+%% split_options([Options...]) -> {ProtocolOptions, TransportOptions}
+%%
+split_options(Options) ->
+ split_options(Options, [], [], []).
+
+split_options([], ClientIn, ProtoIn, TransIn) ->
+ {ClientIn, ProtoIn, TransIn};
+
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
+ when OptKey =:= monitor ->
+ split_options(Rest, [Opt | ClientIn], ProtoIn, TransIn);
+
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
+ when OptKey =:= strict_read;
+ OptKey =:= strict_write ->
+ split_options(Rest, ClientIn, [Opt | ProtoIn], TransIn);
+
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
+ when OptKey =:= framed;
+ OptKey =:= connect_timeout;
+ OptKey =:= sockopts ->
+ split_options(Rest, ClientIn, ProtoIn, [Opt | TransIn]).
-new(Protocol, Service)
- when is_atom(Service) ->
- {ok, #tclient{protocol = Protocol,
- service = Service,
- seqid = 0}}.
+%%--------------------------------------------------------------------
+%% Function: start() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server as an unlinked process.
+%%--------------------------------------------------------------------
--spec call(#tclient{}, atom(), list()) -> {#tclient{}, {ok, any()} | {error, any()}}.
-call(Client = #tclient{}, Function, Args)
- when is_atom(Function), is_list(Args) ->
- case send_function_call(Client, Function, Args) of
- {Client1, ok} ->
- receive_function_result(Client1, Function);
- Else ->
- Else
+%% Backwards-compatible starter for the common-case of socket transports
+start(Host, Port, Service, Options)
+ when is_integer(Port), is_atom(Service), is_list(Options) ->
+ {ClientOpts, ProtoOpts, TransOpts} = split_options(Options),
+
+ {ok, TransportFactory} =
+ thrift_socket_transport:new_transport_factory(Host, Port, TransOpts),
+
+ {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory(
+ TransportFactory, ProtoOpts),
+
+ start(ProtocolFactory, Service, ClientOpts).
+
+
+%% ProtocolFactory :: fun() -> thrift_protocol()
+start(ProtocolFactory, Service, ClientOpts)
+ when is_function(ProtocolFactory), is_atom(Service) ->
+ {Starter, Opts} =
+ case lists:keysearch(monitor, 1, ClientOpts) of
+ {value, {monitor, link}} ->
+ {start_link, []};
+ {value, {monitor, tether}} ->
+ {start, [{tether, self()}]};
+ _ ->
+ {start, []}
+ end,
+
+ Connect =
+ case lists:keysearch(connect, 1, ClientOpts) of
+ {value, {connect, Choice}} ->
+ Choice;
+ _ ->
+ %% By default, connect at creation-time.
+ true
+ end,
+
+
+ Started = gen_server:Starter(?MODULE, [Service, Opts], []),
+
+ if
+ Connect ->
+ case Started of
+ {ok, Pid} ->
+ case gen_server:call(Pid, {connect, ProtocolFactory}) of
+ ok ->
+ {ok, Pid};
+ Error ->
+ Error
+ end;
+ Else ->
+ Else
+ end;
+ true ->
+ Started
end.
+call(Client, Function, Args)
+ when is_pid(Client), is_atom(Function), is_list(Args) ->
+ case gen_server:call(Client, {call, Function, Args}) of
+ R = {ok, _} -> R;
+ R = {error, _} -> R;
+ {exception, Exception} -> throw(Exception)
+ end.
+
+cast(Client, Function, Args)
+ when is_pid(Client), is_atom(Function), is_list(Args) ->
+ gen_server:cast(Client, {call, Function, Args}).
%% Sends a function call but does not read the result. This is useful
%% if you're trying to log non-oneway function calls to write-only
%% transports like thrift_disk_log_transport.
--spec send_call(#tclient{}, atom(), list()) -> {#tclient{}, ok}.
-send_call(Client = #tclient{}, Function, Args)
- when is_atom(Function), is_list(Args) ->
- send_function_call(Client, Function, Args).
+send_call(Client, Function, Args)
+ when is_pid(Client), is_atom(Function), is_list(Args) ->
+ gen_server:call(Client, {send_call, Function, Args}).
--spec close(#tclient{}) -> ok.
-close(#tclient{protocol=Protocol}) ->
- thrift_protocol:close_transport(Protocol).
+close(Client) when is_pid(Client) ->
+ gen_server:cast(Client, close).
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Service, Opts]) ->
+ case lists:keysearch(tether, 1, Opts) of
+ {value, {tether, Pid}} ->
+ erlang:monitor(process, Pid);
+ _Else ->
+ ok
+ end,
+ {ok, #state{service = Service}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({connect, ProtocolFactory}, _From,
+ State = #state{service = Service}) ->
+ case ProtocolFactory() of
+ {ok, Protocol} ->
+ {reply, ok, State#state{protocol = Protocol,
+ seqid = 0}};
+ Error ->
+ {stop, normal, Error, State}
+ end;
+
+handle_call({call, Function, Args}, _From, State = #state{service = Service}) ->
+ Result = catch_function_exceptions(
+ fun() ->
+ ok = send_function_call(State, Function, Args),
+ receive_function_result(State, Function)
+ end,
+ Service),
+ {reply, Result, State};
+
+
+handle_call({send_call, Function, Args}, _From, State = #state{service = Service}) ->
+ Result = catch_function_exceptions(
+ fun() ->
+ send_function_call(State, Function, Args)
+ end,
+ Service),
+ {reply, Result, State}.
+
+
+%% Helper function that catches exceptions thrown by sending or receiving
+%% a function and returns the correct response for call or send_only above.
+catch_function_exceptions(Fun, Service) ->
+ try
+ Fun()
+ catch
+ throw:{return, Return} ->
+ Return;
+ error:function_clause ->
+ ST = erlang:get_stacktrace(),
+ case hd(ST) of
+ {Service, function_info, [Function, _]} ->
+ {error, {no_function, Function}};
+ _ -> throw({error, {function_clause, ST}})
+ end
+ end.
+
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast({call, Function, Args}, State = #state{service = Service,
+ protocol = Protocol,
+ seqid = SeqId}) ->
+ _Result =
+ try
+ ok = send_function_call(State, Function, Args),
+ receive_function_result(State, Function)
+ catch
+ Class:Reason ->
+ error_logger:error_msg("error ignored in handle_cast({cast,...},...): ~p:~p~n", [Class, Reason])
+ end,
+
+ {noreply, State};
+
+handle_cast(close, State=#state{protocol = Protocol}) ->
+%% error_logger:info_msg("thrift_client ~p received close", [self()]),
+ {stop,normal,State};
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info({'DOWN', MonitorRef, process, Pid, _Info}, State)
+ when is_reference(MonitorRef), is_pid(Pid) ->
+ %% We don't actually verify the correctness of the DOWN message.
+ {stop, parent_died, State};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(Reason, State = #state{protocol=undefined}) ->
+ ok;
+terminate(Reason, State = #state{protocol=Protocol}) ->
+ thrift_protocol:close_transport(Protocol),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
--spec send_function_call(#tclient{}, atom(), list()) -> {#tclient{}, ok | {error, any()}}.
-send_function_call(Client = #tclient{protocol = Proto0,
- service = Service,
- seqid = SeqId},
+send_function_call(#state{protocol = Proto,
+ service = Service,
+ seqid = SeqId},
Function,
Args) ->
Params = Service:function_info(Function, params_type),
- case Params of
- no_function ->
- {Client, {error, {no_function, Function}}};
- {struct, PList} when length(PList) =/= length(Args) ->
- {Client, {error, {bad_args, Function, Args}}};
- {struct, _PList} ->
- Begin = #protocol_message_begin{name = atom_to_list(Function),
- type = ?tMessageType_CALL,
- seqid = SeqId},
- {Proto1, ok} = thrift_protocol:write(Proto0, Begin),
- {Proto2, ok} = thrift_protocol:write(Proto1, {Params, list_to_tuple([Function | Args])}),
- {Proto3, ok} = thrift_protocol:write(Proto2, message_end),
- {Proto4, ok} = thrift_protocol:flush_transport(Proto3),
- {Client#tclient{protocol = Proto4}, ok}
- end.
+ {struct, PList} = Params,
+ if
+ length(PList) =/= length(Args) ->
+ throw({return, {error, {bad_args, Function, Args}}});
+ true -> ok
+ end,
--spec receive_function_result(#tclient{}, atom()) -> {#tclient{}, {ok, any()} | {error, any()}}.
-receive_function_result(Client = #tclient{service = Service}, Function) ->
+ Begin = #protocol_message_begin{name = atom_to_list(Function),
+ type = ?tMessageType_CALL,
+ seqid = SeqId},
+ ok = thrift_protocol:write(Proto, Begin),
+ ok = thrift_protocol:write(Proto, {Params, list_to_tuple([Function | Args])}),
+ ok = thrift_protocol:write(Proto, message_end),
+ thrift_protocol:flush_transport(Proto),
+ ok.
+
+receive_function_result(State = #state{protocol = Proto,
+ service = Service},
+ Function) ->
ResultType = Service:function_info(Function, reply_type),
- read_result(Client, Function, ResultType).
+ read_result(State, Function, ResultType).
-read_result(Client, _Function, oneway_void) ->
- {Client, {ok, ok}};
+read_result(_State,
+ _Function,
+ oneway_void) ->
+ {ok, ok};
-read_result(Client = #tclient{protocol = Proto0,
- seqid = SeqId},
+read_result(State = #state{protocol = Proto,
+ seqid = SeqId},
Function,
ReplyType) ->
- {Proto1, MessageBegin} = thrift_protocol:read(Proto0, message_begin),
- NewClient = Client#tclient{protocol = Proto1},
- case MessageBegin of
+ case thrift_protocol:read(Proto, message_begin) of
#protocol_message_begin{seqid = RetSeqId} when RetSeqId =/= SeqId ->
- {NewClient, {error, {bad_seq_id, SeqId}}};
+ {error, {bad_seq_id, SeqId}};
#protocol_message_begin{type = ?tMessageType_EXCEPTION} ->
- handle_application_exception(NewClient);
+ handle_application_exception(State);
#protocol_message_begin{type = ?tMessageType_REPLY} ->
- handle_reply(NewClient, Function, ReplyType)
+ handle_reply(State, Function, ReplyType)
end.
-
-handle_reply(Client = #tclient{protocol = Proto0,
- service = Service},
+handle_reply(State = #state{protocol = Proto,
+ service = Service},
Function,
ReplyType) ->
{struct, ExceptionFields} = Service:function_info(Function, exceptions),
ReplyStructDef = {struct, [{0, ReplyType}] ++ ExceptionFields},
- {Proto1, {ok, Reply}} = thrift_protocol:read(Proto0, ReplyStructDef),
- {Proto2, ok} = thrift_protocol:read(Proto1, message_end),
- NewClient = Client#tclient{protocol = Proto2},
+ {ok, Reply} = thrift_protocol:read(Proto, ReplyStructDef),
ReplyList = tuple_to_list(Reply),
true = length(ReplyList) == length(ExceptionFields) + 1,
ExceptionVals = tl(ReplyList),
Thrown = [X || X <- ExceptionVals,
X =/= undefined],
- case Thrown of
- [] when ReplyType == {struct, []} ->
- {NewClient, {ok, ok}};
- [] ->
- {NewClient, {ok, hd(ReplyList)}};
- [Exception] ->
- throw({NewClient, {exception, Exception}})
- end.
+ Result =
+ case Thrown of
+ [] when ReplyType == {struct, []} ->
+ {ok, ok};
+ [] ->
+ {ok, hd(ReplyList)};
+ [Exception] ->
+ {exception, Exception}
+ end,
+ ok = thrift_protocol:read(Proto, message_end),
+ Result.
-handle_application_exception(Client = #tclient{protocol = Proto0}) ->
- {Proto1, {ok, Exception}} =
- thrift_protocol:read(Proto0, ?TApplicationException_Structure),
- {Proto2, ok} = thrift_protocol:read(Proto1, message_end),
+handle_application_exception(State = #state{protocol = Proto}) ->
+ {ok, Exception} = thrift_protocol:read(Proto,
+ ?TApplicationException_Structure),
+ ok = thrift_protocol:read(Proto, message_end),
XRecord = list_to_tuple(
['TApplicationException' | tuple_to_list(Exception)]),
error_logger:error_msg("X: ~p~n", [XRecord]),
true = is_record(XRecord, 'TApplicationException'),
- NewClient = Client#tclient{protocol = Proto2},
- throw({NewClient, {exception, XRecord}}).
+ {exception, XRecord}.
diff --git a/lib/erl/src/thrift_client_util.erl b/lib/erl/src/thrift_client_util.erl
deleted file mode 100644
index c52bb8b..0000000
--- a/lib/erl/src/thrift_client_util.erl
+++ /dev/null
@@ -1,61 +0,0 @@
-%%
-%% Licensed to the Apache Software Foundation (ASF) under one
-%% or more contributor license agreements. See the NOTICE file
-%% distributed with this work for additional information
-%% regarding copyright ownership. The ASF licenses this file
-%% to you under the Apache License, Version 2.0 (the
-%% "License"); you may not use this file except in compliance
-%% with the License. You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied. See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-
--module(thrift_client_util).
-
--export([new/4]).
-
-%%
-%% Splits client options into client, protocol, and transport options
-%%
-%% split_options([Options...]) -> {ProtocolOptions, TransportOptions}
-%%
-split_options(Options) ->
- split_options(Options, [], []).
-
-split_options([], ProtoIn, TransIn) ->
- {ProtoIn, TransIn};
-
-split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn)
- when OptKey =:= strict_read;
- OptKey =:= strict_write ->
- split_options(Rest, [Opt | ProtoIn], TransIn);
-
-split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn)
- when OptKey =:= framed;
- OptKey =:= connect_timeout;
- OptKey =:= sockopts ->
- split_options(Rest, ProtoIn, [Opt | TransIn]).
-
-
-%% Client constructor for the common-case of socket transports
-%% with the binary protocol
-new(Host, Port, Service, Options)
- when is_integer(Port), is_atom(Service), is_list(Options) ->
- {ProtoOpts, TransOpts} = split_options(Options),
-
- {ok, TransportFactory} =
- thrift_socket_transport:new_transport_factory(Host, Port, TransOpts),
-
- {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory(
- TransportFactory, ProtoOpts),
-
- {ok, Protocol} = ProtocolFactory(),
-
- thrift_client:new(Protocol, Service).
diff --git a/lib/erl/src/thrift_disk_log_transport.erl b/lib/erl/src/thrift_disk_log_transport.erl
index de8ee41..761fa30 100644
--- a/lib/erl/src/thrift_disk_log_transport.erl
+++ b/lib/erl/src/thrift_disk_log_transport.erl
@@ -35,8 +35,6 @@
close_on_close = false,
sync_every = infinity,
sync_tref}).
--type state() :: #dl_transport{}.
--include("thrift_transport_behaviour.hrl").
%% Create a transport attached to an already open log.
@@ -49,7 +47,7 @@
State2 =
case State#dl_transport.sync_every of
N when is_integer(N), N > 0 ->
- {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, [State]),
+ {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, State),
State#dl_transport{sync_tref = TRef};
_ -> State
end,
@@ -60,41 +58,38 @@
parse_opts([], State) ->
State;
parse_opts([{close_on_close, Bool} | Rest], State) when is_boolean(Bool) ->
- parse_opts(Rest, State#dl_transport{close_on_close = Bool});
+ State#dl_transport{close_on_close = Bool};
parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 ->
- parse_opts(Rest, State#dl_transport{sync_every = Int}).
+ State#dl_transport{sync_every = Int}.
%%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% disk_log_transport is write-only
-read(State, _Len) ->
- {State, {error, no_read_from_disk_log}}.
+read(_State, Len) ->
+ {error, no_read_from_disk_log}.
-write(This = #dl_transport{log = Log}, Data) ->
- {This, disk_log:balog(Log, erlang:iolist_to_binary(Data))}.
+write(#dl_transport{log = Log}, Data) ->
+ disk_log:balog(Log, erlang:iolist_to_binary(Data)).
force_flush(#dl_transport{log = Log}) ->
error_logger:info_msg("~p syncing~n", [?MODULE]),
disk_log:sync(Log).
-flush(This = #dl_transport{log = Log, sync_every = SE}) ->
+flush(#dl_transport{log = Log, sync_every = SE}) ->
case SE of
undefined -> % no time-based sync
disk_log:sync(Log);
_Else -> % sync will happen automagically
ok
- end,
- {This, ok}.
-
-
+ end.
%% On close, close the underlying log if we're configured to do so.
-close(This = #dl_transport{close_on_close = false}) ->
- {This, ok};
-close(This = #dl_transport{log = Log}) ->
- {This, disk_log:lclose(Log)}.
+close(#dl_transport{close_on_close = false}) ->
+ ok;
+close(#dl_transport{log = Log}) ->
+ disk_log:lclose(Log).
%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -114,10 +109,10 @@
ExtraLogOpts],
Log =
case disk_log:open(LogOpts) of
- {ok, LogS} ->
- LogS;
- {repaired, LogS, Info1, Info2} ->
- error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [LogS, Info1, Info2]),
- LogS
+ {ok, Log} ->
+ Log;
+ {repaired, Log, Info1, Info2} ->
+ error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [Log, Info1, Info2]),
+ Log
end,
new(Log, TransportOpts).
diff --git a/lib/erl/src/thrift_file_transport.erl b/lib/erl/src/thrift_file_transport.erl
index ba3aa89..5ac2dbe 100644
--- a/lib/erl/src/thrift_file_transport.erl
+++ b/lib/erl/src/thrift_file_transport.erl
@@ -29,8 +29,6 @@
-record(t_file_transport, {device,
should_close = true,
mode = write}).
--type state() :: #t_file_transport{}.
--include("thrift_transport_behaviour.hrl").
%%%% CONSTRUCTION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -65,25 +63,25 @@
%%%% TRANSPORT IMPL %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-write(This = #t_file_transport{device = Device, mode = write}, Data) ->
- {This, file:write(Device, Data)};
-write(This, _D) ->
- {This, {error, read_mode}}.
+write(#t_file_transport{device = Device, mode = write}, Data) ->
+ file:write(Device, Data);
+write(_T, _D) ->
+ {error, read_mode}.
-read(This = #t_file_transport{device = Device, mode = read}, Len)
+read(#t_file_transport{device = Device, mode = read}, Len)
when is_integer(Len), Len >= 0 ->
- {This, file:read(Device, Len)};
-read(This, _D) ->
- {This, {error, read_mode}}.
+ file:read(Device, Len);
+read(_T, _D) ->
+ {error, read_mode}.
-flush(This = #t_file_transport{device = Device, mode = write}) ->
- {This, file:sync(Device)}.
+flush(#t_file_transport{device = Device, mode = write}) ->
+ file:sync(Device).
-close(This = #t_file_transport{device = Device, should_close = SC}) ->
+close(#t_file_transport{device = Device, should_close = SC}) ->
case SC of
true ->
- {This, file:close(Device)};
+ file:close(Device);
false ->
- {This, ok}
+ ok
end.
diff --git a/lib/erl/src/thrift_framed_transport.erl b/lib/erl/src/thrift_framed_transport.erl
index 9b90112..01bab70 100644
--- a/lib/erl/src/thrift_framed_transport.erl
+++ b/lib/erl/src/thrift_framed_transport.erl
@@ -19,11 +19,16 @@
-module(thrift_framed_transport).
+-behaviour(gen_server).
-behaviour(thrift_transport).
%% API
-export([new/1]).
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
%% thrift_transport callbacks
-export([write/2, read/2, flush/1, close/1]).
@@ -31,55 +36,102 @@
read_buffer, % iolist()
write_buffer % iolist()
}).
--type state() :: #framed_transport{}.
--include("thrift_transport_behaviour.hrl").
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
new(WrappedTransport) ->
- State = #framed_transport{wrapped = WrappedTransport,
- read_buffer = [],
- write_buffer = []},
- thrift_transport:new(?MODULE, State).
+ case gen_server:start_link(?MODULE, [WrappedTransport], []) of
+ {ok, Pid} ->
+ thrift_transport:new(?MODULE, Pid);
+ Else ->
+ Else
+ end.
-%% Writes data into the buffer
-write(State = #framed_transport{write_buffer = WBuf}, Data) ->
- {State#framed_transport{write_buffer = [WBuf, Data]}, ok}.
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+ gen_server:call(Transport, {write, Data}).
-%% Flushes the buffer through to the wrapped transport
-flush(State0 = #framed_transport{write_buffer = Buffer,
- wrapped = Wrapped0}) ->
- FrameLen = iolist_size(Buffer),
- Data = [<<FrameLen:32/integer-signed-big>>, Buffer],
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+ gen_server:call(Transport, flush).
- {Wrapped1, Response} = thrift_transport:write(Wrapped0, Data),
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport and the wrapped transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+ gen_server:cast(Transport, close).
- {Wrapped2, _} = thrift_transport:flush(Wrapped1),
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+ gen_server:call(Transport, {read, Len}).
- State1 = State0#framed_transport{wrapped = Wrapped2, write_buffer = []},
- {State1, Response}.
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
-%% Closes the transport and the wrapped transport
-close(State = #framed_transport{wrapped = Wrapped0}) ->
- {Wrapped1, Result} = thrift_transport:close(Wrapped0),
- NewState = State#framed_transport{wrapped = Wrapped1},
- {NewState, Result}.
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Wrapped]) ->
+ {ok, #framed_transport{wrapped = Wrapped,
+ read_buffer = [],
+ write_buffer = []}}.
-%% Reads data through from the wrapped transport
-read(State0 = #framed_transport{wrapped = Wrapped0, read_buffer = RBuf},
- Len) when is_integer(Len) ->
- {Wrapped1, {RBuf1, RBuf1Size}} =
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({write, Data}, _From, State = #framed_transport{write_buffer = WBuf}) ->
+ {reply, ok, State#framed_transport{write_buffer = [WBuf, Data]}};
+
+handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped,
+ read_buffer = RBuf}) ->
+ {RBuf1, RBuf1Size} =
%% if the read buffer is empty, read another frame
%% otherwise, just read from what's left in the buffer
case iolist_size(RBuf) of
0 ->
%% read the frame length
- {WrappedS1, {ok, <<FrameLen:32/integer-signed-big, _/binary>>}} =
- thrift_transport:read(Wrapped0, 4),
+ {ok, <<FrameLen:32/integer-signed-big, _/binary>>} =
+ thrift_transport:read(Wrapped, 4),
%% then read the data
- {WrappedS2, {ok, Bin}} =
- thrift_transport:read(WrappedS1, FrameLen),
- {WrappedS2, {Bin, erlang:byte_size(Bin)}};
+ {ok, Bin} =
+ thrift_transport:read(Wrapped, FrameLen),
+ {Bin, erlang:byte_size(Bin)};
Sz ->
- {Wrapped0, {RBuf, Sz}}
+ {RBuf, Sz}
end,
%% pull off Give bytes, return them to the user, leave the rest in the buffer
@@ -87,13 +139,69 @@
<<Data:Give/binary, RBuf2/binary>> = iolist_to_binary(RBuf1),
Response = {ok, Data},
- State1 = State0#framed_transport{wrapped = Wrapped1, read_buffer=RBuf2},
+ State1 = State#framed_transport{read_buffer=RBuf2},
- {State1, Response}.
+ {reply, Response, State1};
+
+handle_call(flush, _From, State) ->
+ {Response, State1} = do_flush(State),
+ {reply, Response, State1}.
%%--------------------------------------------------------------------
-%% Internal functions
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
%%--------------------------------------------------------------------
+handle_cast(close, State) ->
+ {_, State1} = do_flush(State),
+ %% Wrapped is closed by terminate/2
+ %% error_logger:info_msg("thrift_framed_transport ~p: closing", [self()]),
+ {stop, normal, State};
+handle_cast(Msg, State=#framed_transport{}) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, State = #framed_transport{wrapped=Wrapped}) ->
+ thrift_transport:close(Wrapped),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+do_flush(State = #framed_transport{write_buffer = Buffer,
+ wrapped = Wrapped}) ->
+ FrameLen = iolist_size(Buffer),
+ Data = [<<FrameLen:32/integer-signed-big>>, Buffer],
+
+ Response = thrift_transport:write(Wrapped, Data),
+
+ thrift_transport:flush(Wrapped),
+
+ State1 = State#framed_transport{write_buffer = []},
+ {Response, State1}.
min(A,B) when A<B -> A;
min(_,B) -> B.
diff --git a/lib/erl/src/thrift_http_transport.erl b/lib/erl/src/thrift_http_transport.erl
index 09113cc..f8c1827 100644
--- a/lib/erl/src/thrift_http_transport.erl
+++ b/lib/erl/src/thrift_http_transport.erl
@@ -19,11 +19,20 @@
-module(thrift_http_transport).
+-behaviour(gen_server).
-behaviour(thrift_transport).
%% API
-export([new/2, new/3]).
+%% gen_server callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
%% thrift_transport callbacks
-export([write/2, read/2, flush/1, close/1]).
@@ -34,9 +43,14 @@
http_options, % see http(3)
extra_headers % [{str(), str()}, ...]
}).
--type state() :: pid().
--include("thrift_transport_behaviour.hrl").
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: new() -> {ok, Transport} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
new(Host, Path) ->
new(Host, Path, _Options = []).
@@ -46,6 +60,54 @@
%% {extra_headers, ExtraHeaders} = List of extra HTTP headers
%%--------------------------------------------------------------------
new(Host, Path, Options) ->
+ case gen_server:start_link(?MODULE, {Host, Path, Options}, []) of
+ {ok, Pid} ->
+ thrift_transport:new(?MODULE, Pid);
+ Else ->
+ Else
+ end.
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+ gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer, making a request
+%%--------------------------------------------------------------------
+flush(Transport) ->
+ gen_server:call(Transport, flush).
+
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+ gen_server:cast(Transport, close).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+ gen_server:call(Transport, {read, Len}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+init({Host, Path, Options}) ->
State1 = #http_transport{host = Host,
path = Path,
read_buffer = [],
@@ -65,17 +127,50 @@
end,
case lists:foldl(ApplyOption, State1, Options) of
State2 = #http_transport{} ->
- thrift_transport:new(?MODULE, State2);
+ {ok, State2};
Else ->
- {error, Else}
+ {stop, Else}
end.
-%% Writes data into the buffer
-write(State = #http_transport{write_buffer = WBuf}, Data) ->
- {State#http_transport{write_buffer = [WBuf, Data]}, ok}.
+handle_call({write, Data}, _From, State = #http_transport{write_buffer = WBuf}) ->
+ {reply, ok, State#http_transport{write_buffer = [WBuf, Data]}};
-%% Flushes the buffer, making a request
-flush(State = #http_transport{host = Host,
+handle_call({read, Len}, _From, State = #http_transport{read_buffer = RBuf}) ->
+ %% Pull off Give bytes, return them to the user, leave the rest in the buffer.
+ Give = min(iolist_size(RBuf), Len),
+ case iolist_to_binary(RBuf) of
+ <<Data:Give/binary, RBuf1/binary>> ->
+ Response = {ok, Data},
+ State1 = State#http_transport{read_buffer=RBuf1},
+ {reply, Response, State1};
+ _ ->
+ {reply, {error, 'EOF'}, State}
+ end;
+
+handle_call(flush, _From, State) ->
+ {Response, State1} = do_flush(State),
+ {reply, Response, State1}.
+
+handle_cast(close, State) ->
+ {_, State1} = do_flush(State),
+ {stop, normal, State1};
+
+handle_cast(_Msg, State=#http_transport{}) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+do_flush(State = #http_transport{host = Host,
path = Path,
read_buffer = Rbuf,
write_buffer = Wbuf,
@@ -84,7 +179,7 @@
case iolist_to_binary(Wbuf) of
<<>> ->
%% Don't bother flushing empty buffers.
- {State, ok};
+ {ok, State};
WBinary ->
{ok, {{_Version, 200, _ReasonPhrase}, _Headers, Body}} =
http:request(post,
@@ -97,22 +192,7 @@
State1 = State#http_transport{read_buffer = [Rbuf, Body],
write_buffer = []},
- {State1, ok}
- end.
-
-close(State) ->
- {State, ok}.
-
-read(State = #http_transport{read_buffer = RBuf}, Len) when is_integer(Len) ->
- %% Pull off Give bytes, return them to the user, leave the rest in the buffer.
- Give = min(iolist_size(RBuf), Len),
- case iolist_to_binary(RBuf) of
- <<Data:Give/binary, RBuf1/binary>> ->
- Response = {ok, Data},
- State1 = State#http_transport{read_buffer=RBuf1},
- {State1, Response};
- _ ->
- {State, {error, 'EOF'}}
+ {ok, State1}
end.
min(A,B) when A<B -> A;
diff --git a/lib/erl/src/thrift_memory_buffer.erl b/lib/erl/src/thrift_memory_buffer.erl
index c44449e..b4f607a 100644
--- a/lib/erl/src/thrift_memory_buffer.erl
+++ b/lib/erl/src/thrift_memory_buffer.erl
@@ -19,43 +19,145 @@
-module(thrift_memory_buffer).
+-behaviour(gen_server).
-behaviour(thrift_transport).
%% API
-export([new/0, new_transport_factory/0]).
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
%% thrift_transport callbacks
-export([write/2, read/2, flush/1, close/1]).
-record(memory_buffer, {buffer}).
--type state() :: #memory_buffer{}.
--include("thrift_transport_behaviour.hrl").
+%%====================================================================
+%% API
+%%====================================================================
new() ->
- State = #memory_buffer{buffer = []},
- thrift_transport:new(?MODULE, State).
+ case gen_server:start_link(?MODULE, [], []) of
+ {ok, Pid} ->
+ thrift_transport:new(?MODULE, Pid);
+ Else ->
+ Else
+ end.
new_transport_factory() ->
{ok, fun() -> new() end}.
-%% Writes data into the buffer
-write(State = #memory_buffer{buffer = Buf}, Data) ->
- {State#memory_buffer{buffer = [Buf, Data]}, ok}.
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+ gen_server:call(Transport, {write, Data}).
-flush(State) ->
- {State, ok}.
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+ gen_server:call(Transport, flush).
-close(State) ->
- {State, ok}.
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport and the wrapped transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+ gen_server:cast(Transport, close).
-read(State = #memory_buffer{buffer = Buf}, Len) when is_integer(Len) ->
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+ gen_server:call(Transport, {read, Len}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([]) ->
+ {ok, #memory_buffer{buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({write, Data}, _From, State = #memory_buffer{buffer = Buf}) ->
+ {reply, ok, State#memory_buffer{buffer = [Buf, Data]}};
+
+handle_call({read, Len}, _From, State = #memory_buffer{buffer = Buf}) ->
Binary = iolist_to_binary(Buf),
Give = min(iolist_size(Binary), Len),
{Result, Remaining} = split_binary(Binary, Give),
- {State#memory_buffer{buffer = Remaining}, {ok, Result}}.
+ {reply, {ok, Result}, State#memory_buffer{buffer = Remaining}};
+
+handle_call(flush, _From, State) ->
+ {reply, ok, State}.
%%--------------------------------------------------------------------
-%% Internal functions
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(close, State) ->
+ {stop, normal, State};
+handle_cast(Msg, State=#memory_buffer{}) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
%%--------------------------------------------------------------------
min(A,B) when A<B -> A;
min(_,B) -> B.
diff --git a/lib/erl/src/thrift_processor.erl b/lib/erl/src/thrift_processor.erl
index 4315505..e26fb33 100644
--- a/lib/erl/src/thrift_processor.erl
+++ b/lib/erl/src/thrift_processor.erl
@@ -24,54 +24,55 @@
-include("thrift_constants.hrl").
-include("thrift_protocol.hrl").
--record(thrift_processor, {handler, protocol, service}).
+-record(thrift_processor, {handler, in_protocol, out_protocol, service}).
-init({_Server, ProtoGen, Service, Handler}) when is_function(ProtoGen, 0) ->
- {ok, Proto} = ProtoGen(),
- loop(#thrift_processor{protocol = Proto,
+init({Server, ProtoGen, Service, Handler}) when is_function(ProtoGen, 0) ->
+ {ok, IProt, OProt} = ProtoGen(),
+ loop(#thrift_processor{in_protocol = IProt,
+ out_protocol = OProt,
service = Service,
handler = Handler}).
-loop(State0 = #thrift_processor{protocol = Proto0}) ->
- {Proto1, MessageBegin} = thrift_protocol:read(Proto0, message_begin),
- State1 = State0#thrift_processor{protocol = Proto1},
- case MessageBegin of
+loop(State = #thrift_processor{in_protocol = IProto,
+ out_protocol = OProto}) ->
+ case thrift_protocol:read(IProto, message_begin) of
#protocol_message_begin{name = Function,
type = ?tMessageType_CALL} ->
- {State2, ok} = handle_function(State1, list_to_atom(Function)),
- loop(State2);
+ ok = handle_function(State, list_to_atom(Function)),
+ loop(State);
#protocol_message_begin{name = Function,
type = ?tMessageType_ONEWAY} ->
- {State2, ok} = handle_function(State1, list_to_atom(Function)),
- loop(State2);
+ ok = handle_function(State, list_to_atom(Function)),
+ loop(State);
{error, timeout} ->
- thrift_protocol:close_transport(Proto1),
+ thrift_protocol:close_transport(OProto),
ok;
{error, closed} ->
%% error_logger:info_msg("Client disconnected~n"),
- thrift_protocol:close_transport(Proto1),
+ thrift_protocol:close_transport(OProto),
exit(shutdown)
end.
-handle_function(State0=#thrift_processor{protocol = Proto0,
- handler = Handler,
- service = Service},
+handle_function(State=#thrift_processor{in_protocol = IProto,
+ out_protocol = OProto,
+ handler = Handler,
+ service = Service},
Function) ->
InParams = Service:function_info(Function, params_type),
- {Proto1, {ok, Params}} = thrift_protocol:read(Proto0, InParams),
- State1 = State0#thrift_processor{protocol = Proto1},
+ {ok, Params} = thrift_protocol:read(IProto, InParams),
try
Result = Handler:handle_function(Function, Params),
%% {Micro, Result} = better_timer(Handler, handle_function, [Function, Params]),
%% error_logger:info_msg("Processed ~p(~p) in ~.4fms~n",
%% [Function, Params, Micro/1000.0]),
- handle_success(State1, Function, Result)
+ handle_success(State, Function, Result)
catch
- Type:Data when Type =:= throw orelse Type =:= error ->
- handle_function_catch(State1, Function, Type, Data)
- end.
+ Type:Data ->
+ handle_function_catch(State, Function, Type, Data)
+ end,
+ after_reply(OProto).
handle_function_catch(State = #thrift_processor{service = Service},
Function, ErrType, ErrData) ->
@@ -83,37 +84,39 @@
error_logger:warning_msg(
"oneway void ~p threw error which must be ignored: ~p",
[Function, {ErrType, ErrData, Stack}]),
- {State, ok};
+ ok;
{throw, Exception} when is_tuple(Exception), size(Exception) > 0 ->
- %error_logger:warning_msg("~p threw exception: ~p~n", [Function, Exception]),
- handle_exception(State, Function, Exception);
- % we still want to accept more requests from this client
+ error_logger:warning_msg("~p threw exception: ~p~n", [Function, Exception]),
+ handle_exception(State, Function, Exception),
+ ok; % we still want to accept more requests from this client
{error, Error} ->
- handle_error(State, Function, Error)
+ ok = handle_error(State, Function, Error)
end.
-handle_success(State = #thrift_processor{service = Service},
+handle_success(State = #thrift_processor{out_protocol = OProto,
+ service = Service},
Function,
Result) ->
ReplyType = Service:function_info(Function, reply_type),
StructName = atom_to_list(Function) ++ "_result",
- case Result of
- {reply, ReplyData} ->
- Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}},
- send_reply(State, Function, ?tMessageType_REPLY, Reply);
+ ok = case Result of
+ {reply, ReplyData} ->
+ Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}},
+ send_reply(OProto, Function, ?tMessageType_REPLY, Reply);
- ok when ReplyType == {struct, []} ->
- send_reply(State, Function, ?tMessageType_REPLY, {ReplyType, {StructName}});
+ ok when ReplyType == {struct, []} ->
+ send_reply(OProto, Function, ?tMessageType_REPLY, {ReplyType, {StructName}});
- ok when ReplyType == oneway_void ->
- %% no reply for oneway void
- {State, ok}
- end.
+ ok when ReplyType == oneway_void ->
+ %% no reply for oneway void
+ ok
+ end.
-handle_exception(State = #thrift_processor{service = Service},
+handle_exception(State = #thrift_processor{out_protocol = OProto,
+ service = Service},
Function,
Exception) ->
ExceptionType = element(1, Exception),
@@ -138,9 +141,9 @@
% Make sure we got at least one defined
case lists:all(fun(X) -> X =:= undefined end, ExceptionList) of
true ->
- handle_unknown_exception(State, Function, Exception);
+ ok = handle_unknown_exception(State, Function, Exception);
false ->
- send_reply(State, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple})
+ ok = send_reply(OProto, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple})
end.
%%
@@ -151,7 +154,7 @@
handle_error(State, Function, {exception_not_declared_as_thrown,
Exception}).
-handle_error(State, Function, Error) ->
+handle_error(#thrift_processor{out_protocol = OProto}, Function, Error) ->
Stack = erlang:get_stacktrace(),
error_logger:error_msg("~p had an error: ~p~n", [Function, {Error, Stack}]),
@@ -167,14 +170,19 @@
#'TApplicationException'{
message = Message,
type = ?TApplicationException_UNKNOWN}},
- send_reply(State, Function, ?tMessageType_EXCEPTION, Reply).
+ send_reply(OProto, Function, ?tMessageType_EXCEPTION, Reply).
-send_reply(State = #thrift_processor{protocol = Proto0}, Function, ReplyMessageType, Reply) ->
- {Proto1, ok} = thrift_protocol:write(Proto0, #protocol_message_begin{
- name = atom_to_list(Function),
- type = ReplyMessageType,
- seqid = 0}),
- {Proto2, ok} = thrift_protocol:write(Proto1, Reply),
- {Proto3, ok} = thrift_protocol:write(Proto2, message_end),
- {Proto4, ok} = thrift_protocol:flush_transport(Proto3),
- {State#thrift_processor{protocol = Proto4}, ok}.
+send_reply(OProto, Function, ReplyMessageType, Reply) ->
+ ok = thrift_protocol:write(OProto, #protocol_message_begin{
+ name = atom_to_list(Function),
+ type = ReplyMessageType,
+ seqid = 0}),
+ ok = thrift_protocol:write(OProto, Reply),
+ ok = thrift_protocol:write(OProto, message_end),
+ ok = thrift_protocol:flush_transport(OProto),
+ ok.
+
+after_reply(OProto) ->
+ ok = thrift_protocol:flush_transport(OProto)
+ %% ok = thrift_protocol:close_transport(OProto)
+ .
diff --git a/lib/erl/src/thrift_protocol.erl b/lib/erl/src/thrift_protocol.erl
index 4c33412..1bfb0a4 100644
--- a/lib/erl/src/thrift_protocol.erl
+++ b/lib/erl/src/thrift_protocol.erl
@@ -49,13 +49,10 @@
{ok, #protocol{module = Module,
data = Data}}.
--spec flush_transport(#protocol{}) -> {#protocol{}, ok}.
-flush_transport(Proto = #protocol{module = Module,
- data = Data}) ->
- {NewData, Result} = Module:flush_transport(Data),
- {Proto#protocol{data = NewData}, Result}.
+flush_transport(#protocol{module = Module,
+ data = Data}) ->
+ Module:flush_transport(Data).
--spec close_transport(#protocol{}) -> ok.
close_transport(#protocol{module = Module,
data = Data}) ->
Module:close_transport(Data).
@@ -89,8 +86,7 @@
%% Structure is like:
%% [{Fid, Type}, ...]
--spec read(#protocol{}, {struct, _StructDef}, atom()) -> {#protocol{}, {ok, tuple()}}.
-read(IProto0, {struct, Structure}, Tag)
+read(IProto, {struct, Structure}, Tag)
when is_list(Structure), is_atom(Tag) ->
% If we want a tagged tuple, we need to offset all the tuple indices
@@ -107,23 +103,14 @@
% Fid -> {Type, Index}
SDict = dict:from_list(SWithIndices),
- {IProto1, ok} = read(IProto0, struct_begin),
+ ok = read(IProto, struct_begin),
RTuple0 = erlang:make_tuple(length(Structure) + Offset, undefined),
RTuple1 = if Tag =/= undefined -> setelement(1, RTuple0, Tag);
true -> RTuple0
end,
- {IProto2, RTuple2} = read_struct_loop(IProto1, SDict, RTuple1),
- {IProto2, {ok, RTuple2}}.
-
-
-%% NOTE: Keep this in sync with thrift_protocol_behaviour:read
--spec read
- (#protocol{}, {struct, _Info}) -> {#protocol{}, {ok, tuple()} | {error, _Reason}};
- (#protocol{}, tprot_cont_tag()) -> {#protocol{}, {ok, any()} | {error, _Reason}};
- (#protocol{}, tprot_empty_tag()) -> {#protocol{}, ok | {error, _Reason}};
- (#protocol{}, tprot_header_tag()) -> {#protocol{}, tprot_header_val() | {error, _Reason}};
- (#protocol{}, tprot_data_tag()) -> {#protocol{}, {ok, any()} | {error, _Reason}}.
+ RTuple2 = read_struct_loop(IProto, SDict, RTuple1),
+ {ok, RTuple2}.
read(IProto, {struct, {Module, StructureName}}) when is_atom(Module),
is_atom(StructureName) ->
@@ -132,165 +119,137 @@
read(IProto, S={struct, Structure}) when is_list(Structure) ->
read(IProto, S, undefined);
-read(IProto0, {list, Type}) ->
- {IProto1, #protocol_list_begin{etype = EType, size = Size}} =
- read(IProto0, list_begin),
- {EType, EType} = {term_to_typeid(Type), EType},
- {List, IProto2} = lists:mapfoldl(fun(_, ProtoS0) ->
- {ProtoS1, {ok, Item}} = read(ProtoS0, Type),
- {Item, ProtoS1}
- end,
- IProto1,
- lists:duplicate(Size, 0)),
- {IProto3, ok} = read(IProto2, list_end),
- {IProto3, {ok, List}};
+read(IProto, {list, Type}) ->
+ #protocol_list_begin{etype = EType, size = Size} =
+ read(IProto, list_begin),
+ List = [Result || {ok, Result} <-
+ [read(IProto, Type) || _X <- lists:duplicate(Size, 0)]],
+ ok = read(IProto, list_end),
+ {ok, List};
-read(IProto0, {map, KeyType, ValType}) ->
- {IProto1, #protocol_map_begin{size = Size, ktype = KType, vtype = VType}} =
- read(IProto0, map_begin),
- {KType, KType} = {term_to_typeid(KeyType), KType},
- {VType, VType} = {term_to_typeid(ValType), VType},
- {List, IProto2} = lists:mapfoldl(fun(_, ProtoS0) ->
- {ProtoS1, {ok, Key}} = read(ProtoS0, KeyType),
- {ProtoS2, {ok, Val}} = read(ProtoS1, ValType),
- {{Key, Val}, ProtoS2}
- end,
- IProto1,
- lists:duplicate(Size, 0)),
- {IProto3, ok} = read(IProto2, map_end),
- {IProto3, {ok, dict:from_list(List)}};
+read(IProto, {map, KeyType, ValType}) ->
+ #protocol_map_begin{size = Size} =
+ read(IProto, map_begin),
-read(IProto0, {set, Type}) ->
- {IProto1, #protocol_set_begin{etype = EType, size = Size}} =
- read(IProto0, set_begin),
- {EType, EType} = {term_to_typeid(Type), EType},
- {List, IProto2} = lists:mapfoldl(fun(_, ProtoS0) ->
- {ProtoS1, {ok, Item}} = read(ProtoS0, Type),
- {Item, ProtoS1}
- end,
- IProto1,
- lists:duplicate(Size, 0)),
- {IProto3, ok} = read(IProto2, set_end),
- {IProto3, {ok, sets:from_list(List)}};
+ List = [{Key, Val} || {{ok, Key}, {ok, Val}} <-
+ [{read(IProto, KeyType),
+ read(IProto, ValType)} || _X <- lists:duplicate(Size, 0)]],
+ ok = read(IProto, map_end),
+ {ok, dict:from_list(List)};
-read(Protocol, ProtocolType) ->
- read_specific(Protocol, ProtocolType).
+read(IProto, {set, Type}) ->
+ #protocol_set_begin{etype = _EType,
+ size = Size} =
+ read(IProto, set_begin),
+ List = [Result || {ok, Result} <-
+ [read(IProto, Type) || _X <- lists:duplicate(Size, 0)]],
+ ok = read(IProto, set_end),
+ {ok, sets:from_list(List)};
-%% NOTE: Keep this in sync with thrift_protocol_behaviour:read
--spec read_specific
- (#protocol{}, tprot_empty_tag()) -> {#protocol{}, ok | {error, _Reason}};
- (#protocol{}, tprot_header_tag()) -> {#protocol{}, tprot_header_val() | {error, _Reason}};
- (#protocol{}, tprot_data_tag()) -> {#protocol{}, {ok, any()} | {error, _Reason}}.
-read_specific(Proto = #protocol{module = Module,
- data = ModuleData}, ProtocolType) ->
- {NewData, Result} = Module:read(ModuleData, ProtocolType),
- {Proto#protocol{data = NewData}, Result}.
+read(#protocol{module = Module,
+ data = ModuleData}, ProtocolType) ->
+ Module:read(ModuleData, ProtocolType).
-read_struct_loop(IProto0, SDict, RTuple) ->
- {IProto1, #protocol_field_begin{type = FType, id = Fid}} =
- thrift_protocol:read(IProto0, field_begin),
+read_struct_loop(IProto, SDict, RTuple) ->
+ #protocol_field_begin{type = FType, id = Fid, name = Name} =
+ thrift_protocol:read(IProto, field_begin),
case {FType, Fid} of
{?tType_STOP, _} ->
- {IProto1, RTuple};
+ RTuple;
_Else ->
case dict:find(Fid, SDict) of
{ok, {Type, Index}} ->
case term_to_typeid(Type) of
FType ->
- {IProto2, {ok, Val}} = read(IProto1, Type),
- {IProto3, ok} = thrift_protocol:read(IProto2, field_end),
+ {ok, Val} = read(IProto, Type),
+ thrift_protocol:read(IProto, field_end),
NewRTuple = setelement(Index, RTuple, Val),
- read_struct_loop(IProto3, SDict, NewRTuple);
+ read_struct_loop(IProto, SDict, NewRTuple);
Expected ->
error_logger:info_msg(
"Skipping field ~p with wrong type (~p != ~p)~n",
[Fid, FType, Expected]),
- skip_field(FType, IProto1, SDict, RTuple)
+ skip_field(FType, IProto, SDict, RTuple)
end;
_Else2 ->
error_logger:info_msg("Skipping field ~p with unknown fid~n", [Fid]),
- skip_field(FType, IProto1, SDict, RTuple)
+ skip_field(FType, IProto, SDict, RTuple)
end
end.
-skip_field(FType, IProto0, SDict, RTuple) ->
+skip_field(FType, IProto, SDict, RTuple) ->
FTypeAtom = thrift_protocol:typeid_to_atom(FType),
- {IProto1, ok} = thrift_protocol:skip(IProto0, FTypeAtom),
- {IProto2, ok} = read(IProto1, field_end),
- read_struct_loop(IProto2, SDict, RTuple).
-
--spec skip(#protocol{}, any()) -> {#protocol{}, ok}.
-
-skip(Proto0, struct) ->
- {Proto1, ok} = read(Proto0, struct_begin),
- {Proto2, ok} = skip_struct_loop(Proto1),
- {Proto3, ok} = read(Proto2, struct_end),
- {Proto3, ok};
-
-skip(Proto0, map) ->
- {Proto1, Map} = read(Proto0, map_begin),
- {Proto2, ok} = skip_map_loop(Proto1, Map),
- {Proto3, ok} = read(Proto2, map_end),
- {Proto3, ok};
-
-skip(Proto0, set) ->
- {Proto1, Set} = read(Proto0, set_begin),
- {Proto2, ok} = skip_set_loop(Proto1, Set),
- {Proto3, ok} = read(Proto2, set_end),
- {Proto3, ok};
-
-skip(Proto0, list) ->
- {Proto1, List} = read(Proto0, list_begin),
- {Proto2, ok} = skip_list_loop(Proto1, List),
- {Proto3, ok} = read(Proto2, list_end),
- {Proto3, ok};
-
-skip(Proto0, Type) when is_atom(Type) ->
- {Proto1, _Ignore} = read(Proto0, Type),
- {Proto1, ok}.
+ thrift_protocol:skip(IProto, FTypeAtom),
+ read(IProto, field_end),
+ read_struct_loop(IProto, SDict, RTuple).
-skip_struct_loop(Proto0) ->
- {Proto1, #protocol_field_begin{type = Type}} = read(Proto0, field_begin),
+skip(Proto, struct) ->
+ ok = read(Proto, struct_begin),
+ ok = skip_struct_loop(Proto),
+ ok = read(Proto, struct_end);
+
+skip(Proto, map) ->
+ Map = read(Proto, map_begin),
+ ok = skip_map_loop(Proto, Map),
+ ok = read(Proto, map_end);
+
+skip(Proto, set) ->
+ Set = read(Proto, set_begin),
+ ok = skip_set_loop(Proto, Set),
+ ok = read(Proto, set_end);
+
+skip(Proto, list) ->
+ List = read(Proto, list_begin),
+ ok = skip_list_loop(Proto, List),
+ ok = read(Proto, list_end);
+
+skip(Proto, Type) when is_atom(Type) ->
+ _Ignore = read(Proto, Type),
+ ok.
+
+
+skip_struct_loop(Proto) ->
+ #protocol_field_begin{type = Type} = read(Proto, field_begin),
case Type of
?tType_STOP ->
- {Proto1, ok};
+ ok;
_Else ->
- {Proto2, ok} = skip(Proto1, Type),
- {Proto3, ok} = read(Proto2, field_end),
- skip_struct_loop(Proto3)
+ skip(Proto, Type),
+ ok = read(Proto, field_end),
+ skip_struct_loop(Proto)
end.
-skip_map_loop(Proto0, Map = #protocol_map_begin{ktype = Ktype,
- vtype = Vtype,
- size = Size}) ->
+skip_map_loop(Proto, Map = #protocol_map_begin{ktype = Ktype,
+ vtype = Vtype,
+ size = Size}) ->
case Size of
N when N > 0 ->
- {Proto1, ok} = skip(Proto0, Ktype),
- {Proto2, ok} = skip(Proto1, Vtype),
- skip_map_loop(Proto2,
+ skip(Proto, Ktype),
+ skip(Proto, Vtype),
+ skip_map_loop(Proto,
Map#protocol_map_begin{size = Size - 1});
- 0 -> {Proto0, ok}
+ 0 -> ok
end.
-skip_set_loop(Proto0, Map = #protocol_set_begin{etype = Etype,
- size = Size}) ->
+skip_set_loop(Proto, Map = #protocol_set_begin{etype = Etype,
+ size = Size}) ->
case Size of
N when N > 0 ->
- {Proto1, ok} = skip(Proto0, Etype),
- skip_set_loop(Proto1,
+ skip(Proto, Etype),
+ skip_set_loop(Proto,
Map#protocol_set_begin{size = Size - 1});
- 0 -> {Proto0, ok}
+ 0 -> ok
end.
-skip_list_loop(Proto0, Map = #protocol_list_begin{etype = Etype,
- size = Size}) ->
+skip_list_loop(Proto, Map = #protocol_list_begin{etype = Etype,
+ size = Size}) ->
case Size of
N when N > 0 ->
- {Proto1, ok} = skip(Proto0, Etype),
- skip_list_loop(Proto1,
+ skip(Proto, Etype),
+ skip_list_loop(Proto,
Map#protocol_list_begin{size = Size - 1});
- 0 -> {Proto0, ok}
+ 0 -> ok
end.
@@ -308,95 +267,90 @@
%% | list() -- for list
%% | dictionary() -- for map
%% | set() -- for set
-%% | any() -- for base types
+%% | term() -- for base types
%%
%% Description:
%%--------------------------------------------------------------------
--spec write(#protocol{}, any()) -> {#protocol{}, ok | {error, _Reason}}.
-
-write(Proto0, {{struct, StructDef}, Data})
+write(Proto, {{struct, StructDef}, Data})
when is_list(StructDef), is_tuple(Data), length(StructDef) == size(Data) - 1 ->
[StructName | Elems] = tuple_to_list(Data),
- {Proto1, ok} = write(Proto0, #protocol_struct_begin{name = StructName}),
- {Proto2, ok} = struct_write_loop(Proto1, StructDef, Elems),
- {Proto3, ok} = write(Proto2, struct_end),
- {Proto3, ok};
+ ok = write(Proto, #protocol_struct_begin{name = StructName}),
+ ok = struct_write_loop(Proto, StructDef, Elems),
+ ok = write(Proto, struct_end),
+ ok;
write(Proto, {{struct, {Module, StructureName}}, Data})
when is_atom(Module),
is_atom(StructureName),
element(1, Data) =:= StructureName ->
+ StructType = Module:struct_info(StructureName),
write(Proto, {Module:struct_info(StructureName), Data});
-write(Proto0, {{list, Type}, Data})
+write(Proto, {{list, Type}, Data})
when is_list(Data) ->
- {Proto1, ok} = write(Proto0,
+ ok = write(Proto,
#protocol_list_begin{
etype = term_to_typeid(Type),
size = length(Data)
}),
- Proto2 = lists:foldl(fun(Elem, ProtoIn) ->
- {ProtoOut, ok} = write(ProtoIn, {Type, Elem}),
- ProtoOut
- end,
- Proto1,
- Data),
- {Proto3, ok} = write(Proto2, list_end),
- {Proto3, ok};
+ lists:foreach(fun(Elem) ->
+ ok = write(Proto, {Type, Elem})
+ end,
+ Data),
+ ok = write(Proto, list_end),
+ ok;
-write(Proto0, {{map, KeyType, ValType}, Data}) ->
- {Proto1, ok} = write(Proto0,
- #protocol_map_begin{
- ktype = term_to_typeid(KeyType),
- vtype = term_to_typeid(ValType),
- size = dict:size(Data)
- }),
- Proto2 = dict:fold(fun(KeyData, ValData, ProtoS0) ->
- {ProtoS1, ok} = write(ProtoS0, {KeyType, KeyData}),
- {ProtoS2, ok} = write(ProtoS1, {ValType, ValData}),
- ProtoS2
- end,
- Proto1,
- Data),
- {Proto3, ok} = write(Proto2, map_end),
- {Proto3, ok};
+write(Proto, {{map, KeyType, ValType}, Data}) ->
+ ok = write(Proto,
+ #protocol_map_begin{
+ ktype = term_to_typeid(KeyType),
+ vtype = term_to_typeid(ValType),
+ size = dict:size(Data)
+ }),
+ dict:fold(fun(KeyData, ValData, _Acc) ->
+ ok = write(Proto, {KeyType, KeyData}),
+ ok = write(Proto, {ValType, ValData})
+ end,
+ _AccO = ok,
+ Data),
+ ok = write(Proto, map_end),
+ ok;
-write(Proto0, {{set, Type}, Data}) ->
+write(Proto, {{set, Type}, Data}) ->
true = sets:is_set(Data),
- {Proto1, ok} = write(Proto0,
- #protocol_set_begin{
- etype = term_to_typeid(Type),
- size = sets:size(Data)
- }),
- Proto2 = sets:fold(fun(Elem, ProtoIn) ->
- {ProtoOut, ok} = write(ProtoIn, {Type, Elem}),
- ProtoOut
- end,
- Proto1,
- Data),
- {Proto3, ok} = write(Proto2, set_end),
- {Proto3, ok};
+ ok = write(Proto,
+ #protocol_set_begin{
+ etype = term_to_typeid(Type),
+ size = sets:size(Data)
+ }),
+ sets:fold(fun(Elem, _Acc) ->
+ ok = write(Proto, {Type, Elem})
+ end,
+ _Acc0 = ok,
+ Data),
+ ok = write(Proto, set_end),
+ ok;
-write(Proto = #protocol{module = Module,
- data = ModuleData}, Data) ->
- {NewData, Result} = Module:write(ModuleData, Data),
- {Proto#protocol{data = NewData}, Result}.
+write(#protocol{module = Module,
+ data = ModuleData}, Data) ->
+ Module:write(ModuleData, Data).
-struct_write_loop(Proto0, [{Fid, Type} | RestStructDef], [Data | RestData]) ->
- NewProto = case Data of
- undefined ->
- Proto0; % null fields are skipped in response
- _ ->
- {Proto1, ok} = write(Proto0,
- #protocol_field_begin{
- type = term_to_typeid(Type),
- id = Fid
- }),
- {Proto2, ok} = write(Proto1, {Type, Data}),
- {Proto3, ok} = write(Proto2, field_end),
- Proto3
- end,
- struct_write_loop(NewProto, RestStructDef, RestData);
+struct_write_loop(Proto, [{Fid, Type} | RestStructDef], [Data | RestData]) ->
+ case Data of
+ undefined ->
+ % null fields are skipped in response
+ skip;
+ _ ->
+ ok = write(Proto,
+ #protocol_field_begin{
+ type = term_to_typeid(Type),
+ id = Fid
+ }),
+ ok = write(Proto, {Type, Data}),
+ ok = write(Proto, field_end)
+ end,
+ struct_write_loop(Proto, RestStructDef, RestData);
struct_write_loop(Proto, [], []) ->
- write(Proto, field_stop).
+ ok = write(Proto, field_stop),
+ ok.
diff --git a/lib/erl/src/thrift_server.erl b/lib/erl/src/thrift_server.erl
index 5012e16..5d0012b 100644
--- a/lib/erl/src/thrift_server.erl
+++ b/lib/erl/src/thrift_server.erl
@@ -126,7 +126,7 @@
{stop, Reason, State}
end;
-handle_info({inet_async, _ListenSocket, _Ref, Error}, State) ->
+handle_info({inet_async, ListenSocket, Ref, Error}, State) ->
error_logger:error_msg("Error in acceptor: ~p~n", [Error]),
{stop, Error, State};
@@ -177,7 +177,7 @@
{ok, SocketTransport} = thrift_socket_transport:new(Socket),
{ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport),
{ok, Protocol} = thrift_binary_protocol:new(BufferedTransport),
- {ok, Protocol}
+ {ok, Protocol, Protocol}
end,
spawn(thrift_processor, init, [{Server, ProtoGen, Service, Handler}]).
diff --git a/lib/erl/src/thrift_socket_server.erl b/lib/erl/src/thrift_socket_server.erl
index f7c7a02..6794e63 100644
--- a/lib/erl/src/thrift_socket_server.erl
+++ b/lib/erl/src/thrift_socket_server.erl
@@ -166,12 +166,13 @@
new_acceptor(State=#thrift_socket_server{max=0}) ->
error_logger:error_msg("Not accepting new connections"),
State#thrift_socket_server{acceptor=null};
-new_acceptor(State=#thrift_socket_server{listen=Listen,
+new_acceptor(State=#thrift_socket_server{acceptor=OldPid, listen=Listen,
service=Service, handler=Handler,
socket_opts=Opts, framed=Framed
}) ->
Pid = proc_lib:spawn_link(?MODULE, acceptor_loop,
[{self(), Listen, Service, Handler, Opts, Framed}]),
+%% error_logger:info_msg("Spawning new acceptor: ~p => ~p", [OldPid, Pid]),
State#thrift_socket_server{acceptor=Pid}.
acceptor_loop({Server, Listen, Service, Handler, SocketOpts, Framed})
@@ -187,7 +188,7 @@
false -> thrift_buffered_transport:new(SocketTransport)
end,
{ok, Protocol} = thrift_binary_protocol:new(Transport),
- {ok, Protocol}
+ {ok, IProt=Protocol, OProt=Protocol}
end,
thrift_processor:init({Server, ProtoGen, Service, Handler});
{error, closed} ->
diff --git a/lib/erl/src/thrift_socket_transport.erl b/lib/erl/src/thrift_socket_transport.erl
index 5e1ef02..fcd6944 100644
--- a/lib/erl/src/thrift_socket_transport.erl
+++ b/lib/erl/src/thrift_socket_transport.erl
@@ -29,8 +29,6 @@
-record(data, {socket,
recv_timeout=infinity}).
--type state() :: #data{}.
--include("thrift_transport_behaviour.hrl").
new(Socket) ->
new(Socket, []).
@@ -47,26 +45,25 @@
thrift_transport:new(?MODULE, State).
%% Data :: iolist()
-write(This = #data{socket = Socket}, Data) ->
- {This, gen_tcp:send(Socket, Data)}.
+write(#data{socket = Socket}, Data) ->
+ gen_tcp:send(Socket, Data).
-read(This = #data{socket=Socket, recv_timeout=Timeout}, Len)
+read(#data{socket=Socket, recv_timeout=Timeout}, Len)
when is_integer(Len), Len >= 0 ->
case gen_tcp:recv(Socket, Len, Timeout) of
Err = {error, timeout} ->
error_logger:info_msg("read timeout: peer conn ~p", [inet:peername(Socket)]),
gen_tcp:close(Socket),
- {This, Err};
- Data ->
- {This, Data}
+ Err;
+ Data -> Data
end.
%% We can't really flush - everything is flushed when we write
-flush(This) ->
- {This, ok}.
+flush(_) ->
+ ok.
-close(This = #data{socket = Socket}) ->
- {This, gen_tcp:close(Socket)}.
+close(#data{socket = Socket}) ->
+ gen_tcp:close(Socket).
%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
diff --git a/lib/erl/src/thrift_transport.erl b/lib/erl/src/thrift_transport.erl
index 39f8c05..20c4b5d 100644
--- a/lib/erl/src/thrift_transport.erl
+++ b/lib/erl/src/thrift_transport.erl
@@ -37,42 +37,21 @@
-record(transport, {module, data}).
--ifdef(transport_wrapper_module).
--define(debug_wrap(Transport),
- case Transport#transport.module of
- ?transport_wrapper_module ->
- Transport;
- _Else ->
- {ok, Result} = ?transport_wrapper_module:new(Transport),
- Result
- end).
--else.
--define(debug_wrap(Transport), Transport).
--endif.
-
new(Module, Data) when is_atom(Module) ->
- Transport0 = #transport{module = Module, data = Data},
- Transport1 = ?debug_wrap(Transport0),
- {ok, Transport1}.
+ {ok, #transport{module = Module,
+ data = Data}}.
--spec write(#transport{}, iolist() | binary()) -> {#transport{}, ok | {error, _Reason}}.
+%% Data :: iolist()
write(Transport, Data) ->
Module = Transport#transport.module,
- {NewTransData, Result} = Module:write(Transport#transport.data, Data),
- {Transport#transport{data = NewTransData}, Result}.
+ Module:write(Transport#transport.data, Data).
--spec read(#transport{}, non_neg_integer()) -> {#transport{}, {ok, binary()} | {error, _Reason}}.
read(Transport, Len) when is_integer(Len) ->
Module = Transport#transport.module,
- {NewTransData, Result} = Module:read(Transport#transport.data, Len),
- {Transport#transport{data = NewTransData}, Result}.
+ Module:read(Transport#transport.data, Len).
--spec flush(#transport{}) -> {#transport{}, ok | {error, _Reason}}.
-flush(Transport = #transport{module = Module, data = Data}) ->
- {NewTransData, Result} = Module:flush(Data),
- {Transport#transport{data = NewTransData}, Result}.
+flush(#transport{module = Module, data = Data}) ->
+ Module:flush(Data).
--spec close(#transport{}) -> {#transport{}, ok | {error, _Reason}}.
-close(Transport = #transport{module = Module, data = Data}) ->
- {NewTransData, Result} = Module:close(Data),
- {Transport#transport{data = NewTransData}, Result}.
+close(#transport{module = Module, data = Data}) ->
+ Module:close(Data).
diff --git a/lib/erl/src/thrift_transport_state_test.erl b/lib/erl/src/thrift_transport_state_test.erl
deleted file mode 100644
index e83a44d..0000000
--- a/lib/erl/src/thrift_transport_state_test.erl
+++ /dev/null
@@ -1,117 +0,0 @@
-%%
-%% Licensed to the Apache Software Foundation (ASF) under one
-%% or more contributor license agreements. See the NOTICE file
-%% distributed with this work for additional information
-%% regarding copyright ownership. The ASF licenses this file
-%% to you under the Apache License, Version 2.0 (the
-%% "License"); you may not use this file except in compliance
-%% with the License. You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied. See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-
--module(thrift_transport_state_test).
-
--behaviour(gen_server).
--behaviour(thrift_transport).
-
-%% API
--export([new/1]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
-%% thrift_transport callbacks
--export([write/2, read/2, flush/1, close/1]).
-
--record(trans, {wrapped, % #thrift_transport{}
- version :: integer(),
- counter :: pid()
- }).
--type state() :: #trans{}.
--include("thrift_transport_behaviour.hrl").
-
--record(state, {cversion :: integer()}).
-
-
-new(WrappedTransport) ->
- case gen_server:start_link(?MODULE, [], []) of
- {ok, Pid} ->
- Trans = #trans{wrapped = WrappedTransport,
- version = 0,
- counter = Pid},
- thrift_transport:new(?MODULE, Trans);
- Else ->
- Else
- end.
-
-%%====================================================================
-%% thrift_transport callbacks
-%%====================================================================
-
-write(Transport0 = #trans{wrapped = Wrapped0}, Data) ->
- Transport1 = check_version(Transport0),
- {Wrapped1, Result} = thrift_transport:write(Wrapped0, Data),
- Transport2 = Transport1#trans{wrapped = Wrapped1},
- {Transport2, Result}.
-
-flush(Transport0 = #trans{wrapped = Wrapped0}) ->
- Transport1 = check_version(Transport0),
- {Wrapped1, Result} = thrift_transport:flush(Wrapped0),
- Transport2 = Transport1#trans{wrapped = Wrapped1},
- {Transport2, Result}.
-
-close(Transport0 = #trans{wrapped = Wrapped0}) ->
- Transport1 = check_version(Transport0),
- shutdown_counter(Transport1),
- {Wrapped1, Result} = thrift_transport:close(Wrapped0),
- Transport2 = Transport1#trans{wrapped = Wrapped1},
- {Transport2, Result}.
-
-read(Transport0 = #trans{wrapped = Wrapped0}, Len) ->
- Transport1 = check_version(Transport0),
- {Wrapped1, Result} = thrift_transport:read(Wrapped0, Len),
- Transport2 = Transport1#trans{wrapped = Wrapped1},
- {Transport2, Result}.
-
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-init([]) ->
- {ok, #state{cversion = 0}}.
-
-handle_call(check_version, _From, State = #state{cversion = Version}) ->
- {reply, Version, State#state{cversion = Version+1}}.
-
-handle_cast(shutdown, State) ->
- {stop, normal, State}.
-
-handle_info(_Info, State) -> {noreply, State}.
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
-terminate(_Reason, _State) -> ok.
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-check_version(Transport = #trans{version = Version, counter = Counter}) ->
- case gen_server:call(Counter, check_version) of
- Version ->
- Transport#trans{version = Version+1};
- _Else ->
- % State wasn't propagated properly. Die.
- erlang:error(state_not_propagated)
- end.
-
-shutdown_counter(#trans{counter = Counter}) ->
- gen_server:cast(Counter, shutdown).