erlang: Refactor thrift_transport and all transport implementations
Note that the buffering transports still use a separate process to
maintain their state. This change just changes them to use a
"return-the-new-version"-style API.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@990989 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/erl/src/thrift_base64_transport.erl b/lib/erl/src/thrift_base64_transport.erl
index 7630333..3cdb351 100644
--- a/lib/erl/src/thrift_base64_transport.erl
+++ b/lib/erl/src/thrift_base64_transport.erl
@@ -37,24 +37,27 @@
thrift_transport:new(?MODULE, State).
-write(#b64_transport{wrapped = Wrapped}, Data) ->
- thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))).
+write(This = #b64_transport{wrapped = Wrapped}, Data) ->
+ {NewWrapped, Result} = thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))),
+ {This#b64_transport{wrapped = NewWrapped}, Result}.
%% base64 doesn't support reading quite yet since it would involve
%% nasty buffering and such
-read(#b64_transport{wrapped = Wrapped}, Data) ->
- {error, no_reads_allowed}.
+read(This = #b64_transport{}, _Data) ->
+ {This, {error, no_reads_allowed}}.
-flush(#b64_transport{wrapped = Wrapped}) ->
- thrift_transport:write(Wrapped, <<"\n">>),
- thrift_transport:flush(Wrapped).
+flush(This = #b64_transport{wrapped = Wrapped0}) ->
+ {Wrapped1, ok} = thrift_transport:write(Wrapped0, <<"\n">>),
+ {Wrapped2, ok} = thrift_transport:flush(Wrapped1),
+ {This#b64_transport{wrapped = Wrapped2}, ok}.
-close(Me = #b64_transport{wrapped = Wrapped}) ->
- flush(Me),
- thrift_transport:close(Wrapped).
+close(This0) ->
+ {This1 = #b64_transport{wrapped = Wrapped}, ok} = flush(This0),
+ {NewWrapped, ok} = thrift_transport:close(Wrapped),
+ {This1#b64_transport{wrapped = NewWrapped}, ok}.
%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
diff --git a/lib/erl/src/thrift_binary_protocol.erl b/lib/erl/src/thrift_binary_protocol.erl
index fcb072b..796089c 100644
--- a/lib/erl/src/thrift_binary_protocol.erl
+++ b/lib/erl/src/thrift_binary_protocol.erl
@@ -61,12 +61,12 @@
flush_transport(This = #binary_protocol{transport = Transport}) ->
- Result = thrift_transport:flush(Transport),
- {This, Result}.
+ {NewTransport, Result} = thrift_transport:flush(Transport),
+ {This#binary_protocol{transport = NewTransport}, Result}.
close_transport(This = #binary_protocol{transport = Transport}) ->
- Result = thrift_transport:close(Transport),
- {This, Result}.
+ {NewTransport, Result} = thrift_transport:close(Transport),
+ {This#binary_protocol{transport = NewTransport}, Result}.
%%%
%%% instance methods
@@ -166,8 +166,8 @@
%% Data :: iolist()
write(This = #binary_protocol{transport = Trans}, Data) ->
- Result = thrift_transport:write(Trans, Data),
- {This, Result}.
+ {NewTransport, Result} = thrift_transport:write(Trans, Data),
+ {This#binary_protocol{transport = NewTransport}, Result}.
%%
@@ -312,8 +312,8 @@
{#binary_protocol{}, {ok, binary()} | {error, _Reason}}.
read_data(This, 0) -> {This, {ok, <<>>}};
read_data(This = #binary_protocol{transport = Trans}, Len) when is_integer(Len) andalso Len > 0 ->
- Result = thrift_transport:read(Trans, Len),
- {This, Result}.
+ {NewTransport, Result} = thrift_transport:read(Trans, Len),
+ {This#binary_protocol{transport = NewTransport}, Result}.
%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
diff --git a/lib/erl/src/thrift_buffered_transport.erl b/lib/erl/src/thrift_buffered_transport.erl
index c6df73a..6668549 100644
--- a/lib/erl/src/thrift_buffered_transport.erl
+++ b/lib/erl/src/thrift_buffered_transport.erl
@@ -64,7 +64,7 @@
%% Description: Writes data into the buffer
%%--------------------------------------------------------------------
write(Transport, Data) ->
- gen_server:call(Transport, {write, Data}).
+ {Transport, gen_server:call(Transport, {write, Data})}.
%%--------------------------------------------------------------------
%% Function: flush(Transport) -> ok
@@ -72,7 +72,7 @@
%% Description: Flushes the buffer through to the wrapped transport
%%--------------------------------------------------------------------
flush(Transport) ->
- gen_server:call(Transport, flush).
+ {Transport, gen_server:call(Transport, flush)}.
%%--------------------------------------------------------------------
%% Function: close(Transport) -> ok
@@ -80,7 +80,7 @@
%% Description: Closes the transport and the wrapped transport
%%--------------------------------------------------------------------
close(Transport) ->
- gen_server:cast(Transport, close).
+ {Transport, gen_server:cast(Transport, close)}.
%%--------------------------------------------------------------------
%% Function: Read(Transport, Len) -> {ok, Data}
@@ -90,7 +90,7 @@
%% Description: Reads data through from the wrapped transoprt
%%--------------------------------------------------------------------
read(Transport, Len) when is_integer(Len) ->
- gen_server:call(Transport, {read, Len}, _Timeout=10000).
+ {Transport, gen_server:call(Transport, {read, Len}, _Timeout=10000)}.
%%====================================================================
%% gen_server callbacks
@@ -120,14 +120,17 @@
{reply, ok, State#buffered_transport{write_buffer = [WBuf, Data]}};
handle_call({read, Len}, _From, State = #buffered_transport{wrapped = Wrapped}) ->
- Response = thrift_transport:read(Wrapped, Len),
- {reply, Response, State};
+ {NewWrapped, Response} = thrift_transport:read(Wrapped, Len),
+ NewState = State#buffered_transport{wrapped = NewWrapped},
+ {reply, Response, NewState};
handle_call(flush, _From, State = #buffered_transport{write_buffer = WBuf,
- wrapped = Wrapped}) ->
- Response = thrift_transport:write(Wrapped, WBuf),
- thrift_transport:flush(Wrapped),
- {reply, Response, State#buffered_transport{write_buffer = []}}.
+ wrapped = Wrapped0}) ->
+ {Wrapped1, Response} = thrift_transport:write(Wrapped0, WBuf),
+ {Wrapped2, _} = thrift_transport:flush(Wrapped1),
+ NewState = State#buffered_transport{write_buffer = [],
+ wrapped = Wrapped2},
+ {reply, Response, NewState}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
diff --git a/lib/erl/src/thrift_disk_log_transport.erl b/lib/erl/src/thrift_disk_log_transport.erl
index 2645c67..87d9547 100644
--- a/lib/erl/src/thrift_disk_log_transport.erl
+++ b/lib/erl/src/thrift_disk_log_transport.erl
@@ -68,30 +68,33 @@
%%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% disk_log_transport is write-only
-read(_State, Len) ->
- {error, no_read_from_disk_log}.
+read(State, Len) ->
+ {State, {error, no_read_from_disk_log}}.
-write(#dl_transport{log = Log}, Data) ->
- disk_log:balog(Log, erlang:iolist_to_binary(Data)).
+write(This = #dl_transport{log = Log}, Data) ->
+ {This, disk_log:balog(Log, erlang:iolist_to_binary(Data))}.
force_flush(#dl_transport{log = Log}) ->
error_logger:info_msg("~p syncing~n", [?MODULE]),
disk_log:sync(Log).
-flush(#dl_transport{log = Log, sync_every = SE}) ->
+flush(This = #dl_transport{log = Log, sync_every = SE}) ->
case SE of
undefined -> % no time-based sync
disk_log:sync(Log);
_Else -> % sync will happen automagically
ok
- end.
+ end,
+ {This, ok}.
+
+
%% On close, close the underlying log if we're configured to do so.
-close(#dl_transport{close_on_close = false}) ->
- ok;
-close(#dl_transport{log = Log}) ->
- disk_log:lclose(Log).
+close(This = #dl_transport{close_on_close = false}) ->
+ {This, ok};
+close(This = #dl_transport{log = Log}) ->
+ {This, disk_log:lclose(Log)}.
%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
diff --git a/lib/erl/src/thrift_file_transport.erl b/lib/erl/src/thrift_file_transport.erl
index 7ee1c12..63f2d75 100644
--- a/lib/erl/src/thrift_file_transport.erl
+++ b/lib/erl/src/thrift_file_transport.erl
@@ -65,25 +65,25 @@
%%%% TRANSPORT IMPL %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-write(#t_file_transport{device = Device, mode = write}, Data) ->
- file:write(Device, Data);
-write(_T, _D) ->
- {error, read_mode}.
+write(This = #t_file_transport{device = Device, mode = write}, Data) ->
+ {This, file:write(Device, Data)};
+write(This, _D) ->
+ {This, {error, read_mode}}.
-read(#t_file_transport{device = Device, mode = read}, Len)
+read(This = #t_file_transport{device = Device, mode = read}, Len)
when is_integer(Len), Len >= 0 ->
- file:read(Device, Len);
-read(_T, _D) ->
- {error, read_mode}.
+ {This, file:read(Device, Len)};
+read(This, _D) ->
+ {This, {error, read_mode}}.
-flush(#t_file_transport{device = Device, mode = write}) ->
- file:sync(Device).
+flush(This = #t_file_transport{device = Device, mode = write}) ->
+ {This, file:sync(Device)}.
-close(#t_file_transport{device = Device, should_close = SC}) ->
+close(This = #t_file_transport{device = Device, should_close = SC}) ->
case SC of
true ->
- file:close(Device);
+ {This, file:close(Device)};
false ->
- ok
+ {This, ok}
end.
diff --git a/lib/erl/src/thrift_framed_transport.erl b/lib/erl/src/thrift_framed_transport.erl
index 92bd588..c1c8850 100644
--- a/lib/erl/src/thrift_framed_transport.erl
+++ b/lib/erl/src/thrift_framed_transport.erl
@@ -62,7 +62,7 @@
%% Description: Writes data into the buffer
%%--------------------------------------------------------------------
write(Transport, Data) ->
- gen_server:call(Transport, {write, Data}).
+ {Transport, gen_server:call(Transport, {write, Data})}.
%%--------------------------------------------------------------------
%% Function: flush(Transport) -> ok
@@ -70,7 +70,7 @@
%% Description: Flushes the buffer through to the wrapped transport
%%--------------------------------------------------------------------
flush(Transport) ->
- gen_server:call(Transport, flush).
+ {Transport, gen_server:call(Transport, flush)}.
%%--------------------------------------------------------------------
%% Function: close(Transport) -> ok
@@ -78,7 +78,7 @@
%% Description: Closes the transport and the wrapped transport
%%--------------------------------------------------------------------
close(Transport) ->
- gen_server:cast(Transport, close).
+ {Transport, gen_server:cast(Transport, close)}.
%%--------------------------------------------------------------------
%% Function: Read(Transport, Len) -> {ok, Data}
@@ -88,7 +88,7 @@
%% Description: Reads data through from the wrapped transoprt
%%--------------------------------------------------------------------
read(Transport, Len) when is_integer(Len) ->
- gen_server:call(Transport, {read, Len}).
+ {Transport, gen_server:call(Transport, {read, Len})}.
%%====================================================================
%% gen_server callbacks
@@ -118,22 +118,22 @@
handle_call({write, Data}, _From, State = #framed_transport{write_buffer = WBuf}) ->
{reply, ok, State#framed_transport{write_buffer = [WBuf, Data]}};
-handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped,
+handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped0,
read_buffer = RBuf}) ->
- {RBuf1, RBuf1Size} =
+ {Wrapped1, {RBuf1, RBuf1Size}} =
%% if the read buffer is empty, read another frame
%% otherwise, just read from what's left in the buffer
case iolist_size(RBuf) of
0 ->
%% read the frame length
- {ok, <<FrameLen:32/integer-signed-big, _/binary>>} =
- thrift_transport:read(Wrapped, 4),
+ {WrappedS1, {ok, <<FrameLen:32/integer-signed-big, _/binary>>}} =
+ thrift_transport:read(Wrapped0, 4),
%% then read the data
- {ok, Bin} =
- thrift_transport:read(Wrapped, FrameLen),
- {Bin, erlang:byte_size(Bin)};
+ {WrappedS2, {ok, Bin}} =
+ thrift_transport:read(WrappedS1, FrameLen),
+ {WrappedS2, {Bin, erlang:byte_size(Bin)}};
Sz ->
- {RBuf, Sz}
+ {Wrapped0, {RBuf, Sz}}
end,
%% pull off Give bytes, return them to the user, leave the rest in the buffer
@@ -141,7 +141,7 @@
<<Data:Give/binary, RBuf2/binary>> = iolist_to_binary(RBuf1),
Response = {ok, Data},
- State1 = State#framed_transport{read_buffer=RBuf2},
+ State1 = State#framed_transport{wrapped = Wrapped1, read_buffer=RBuf2},
{reply, Response, State1};
@@ -193,15 +193,15 @@
%%% Internal functions
%%--------------------------------------------------------------------
do_flush(State = #framed_transport{write_buffer = Buffer,
- wrapped = Wrapped}) ->
+ wrapped = Wrapped0}) ->
FrameLen = iolist_size(Buffer),
Data = [<<FrameLen:32/integer-signed-big>>, Buffer],
- Response = thrift_transport:write(Wrapped, Data),
+ {Wrapped1, Response} = thrift_transport:write(Wrapped0, Data),
- thrift_transport:flush(Wrapped),
+ {Wrapped2, _} = thrift_transport:flush(Wrapped1),
- State1 = State#framed_transport{write_buffer = []},
+ State1 = State#framed_transport{wrapped = Wrapped2, write_buffer = []},
{Response, State1}.
min(A,B) when A<B -> A;
diff --git a/lib/erl/src/thrift_http_transport.erl b/lib/erl/src/thrift_http_transport.erl
index f0a45fe..44f2313 100644
--- a/lib/erl/src/thrift_http_transport.erl
+++ b/lib/erl/src/thrift_http_transport.erl
@@ -77,7 +77,7 @@
%% Description: Writes data into the buffer
%%--------------------------------------------------------------------
write(Transport, Data) ->
- gen_server:call(Transport, {write, Data}).
+ {Transport, gen_server:call(Transport, {write, Data})}.
%%--------------------------------------------------------------------
%% Function: flush(Transport) -> ok
@@ -85,7 +85,7 @@
%% Description: Flushes the buffer, making a request
%%--------------------------------------------------------------------
flush(Transport) ->
- gen_server:call(Transport, flush).
+ {Transport, gen_server:call(Transport, flush)}.
%%--------------------------------------------------------------------
%% Function: close(Transport) -> ok
@@ -93,7 +93,7 @@
%% Description: Closes the transport
%%--------------------------------------------------------------------
close(Transport) ->
- gen_server:cast(Transport, close).
+ {Transport, gen_server:cast(Transport, close)}.
%%--------------------------------------------------------------------
%% Function: Read(Transport, Len) -> {ok, Data}
@@ -103,7 +103,7 @@
%% Description: Reads data through from the wrapped transoprt
%%--------------------------------------------------------------------
read(Transport, Len) when is_integer(Len) ->
- gen_server:call(Transport, {read, Len}).
+ {Transport, gen_server:call(Transport, {read, Len})}.
%%====================================================================
%% gen_server callbacks
diff --git a/lib/erl/src/thrift_memory_buffer.erl b/lib/erl/src/thrift_memory_buffer.erl
index 34a3668..ae8b6d2 100644
--- a/lib/erl/src/thrift_memory_buffer.erl
+++ b/lib/erl/src/thrift_memory_buffer.erl
@@ -58,7 +58,7 @@
%% Description: Writes data into the buffer
%%--------------------------------------------------------------------
write(Transport, Data) ->
- gen_server:call(Transport, {write, Data}).
+ {Transport, gen_server:call(Transport, {write, Data})}.
%%--------------------------------------------------------------------
%% Function: flush(Transport) -> ok
@@ -66,7 +66,7 @@
%% Description: Flushes the buffer through to the wrapped transport
%%--------------------------------------------------------------------
flush(Transport) ->
- gen_server:call(Transport, flush).
+ {Transport, gen_server:call(Transport, flush)}.
%%--------------------------------------------------------------------
%% Function: close(Transport) -> ok
@@ -74,7 +74,7 @@
%% Description: Closes the transport and the wrapped transport
%%--------------------------------------------------------------------
close(Transport) ->
- gen_server:cast(Transport, close).
+ {Transport, gen_server:cast(Transport, close)}.
%%--------------------------------------------------------------------
%% Function: Read(Transport, Len) -> {ok, Data}
@@ -84,7 +84,7 @@
%% Description: Reads data through from the wrapped transoprt
%%--------------------------------------------------------------------
read(Transport, Len) when is_integer(Len) ->
- gen_server:call(Transport, {read, Len}).
+ {Transport, gen_server:call(Transport, {read, Len})}.
%%====================================================================
%% gen_server callbacks
diff --git a/lib/erl/src/thrift_socket_transport.erl b/lib/erl/src/thrift_socket_transport.erl
index 1a8ba81..4c552ae 100644
--- a/lib/erl/src/thrift_socket_transport.erl
+++ b/lib/erl/src/thrift_socket_transport.erl
@@ -47,25 +47,26 @@
thrift_transport:new(?MODULE, State).
%% Data :: iolist()
-write(#data{socket = Socket}, Data) ->
- gen_tcp:send(Socket, Data).
+write(This = #data{socket = Socket}, Data) ->
+ {This, gen_tcp:send(Socket, Data)}.
-read(#data{socket=Socket, recv_timeout=Timeout}, Len)
+read(This = #data{socket=Socket, recv_timeout=Timeout}, Len)
when is_integer(Len), Len >= 0 ->
case gen_tcp:recv(Socket, Len, Timeout) of
Err = {error, timeout} ->
error_logger:info_msg("read timeout: peer conn ~p", [inet:peername(Socket)]),
gen_tcp:close(Socket),
- Err;
- Data -> Data
+ {This, Err};
+ Data ->
+ {This, Data}
end.
%% We can't really flush - everything is flushed when we write
-flush(_) ->
- ok.
+flush(This) ->
+ {This, ok}.
-close(#data{socket = Socket}) ->
- gen_tcp:close(Socket).
+close(This = #data{socket = Socket}) ->
+ {This, gen_tcp:close(Socket)}.
%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
diff --git a/lib/erl/src/thrift_transport.erl b/lib/erl/src/thrift_transport.erl
index 420281c..0de7a10 100644
--- a/lib/erl/src/thrift_transport.erl
+++ b/lib/erl/src/thrift_transport.erl
@@ -41,20 +41,24 @@
{ok, #transport{module = Module,
data = Data}}.
--spec write(#transport{}, iolist() | binary()) -> ok | {error, _Reason}.
+-spec write(#transport{}, iolist() | binary()) -> {#transport{}, ok | {error, _Reason}}.
write(Transport, Data) ->
Module = Transport#transport.module,
- Module:write(Transport#transport.data, Data).
+ {NewTransData, Result} = Module:write(Transport#transport.data, Data),
+ {Transport#transport{data = NewTransData}, Result}.
--spec read(#transport{}, non_neg_integer()) -> {ok, binary()} | {error, _Reason}.
+-spec read(#transport{}, non_neg_integer()) -> {#transport{}, {ok, binary()} | {error, _Reason}}.
read(Transport, Len) when is_integer(Len) ->
Module = Transport#transport.module,
- Module:read(Transport#transport.data, Len).
+ {NewTransData, Result} = Module:read(Transport#transport.data, Len),
+ {Transport#transport{data = NewTransData}, Result}.
--spec flush(#transport{}) -> ok | {error, _Reason}.
-flush(#transport{module = Module, data = Data}) ->
- Module:flush(Data).
+-spec flush(#transport{}) -> {#transport{}, ok | {error, _Reason}}.
+flush(Transport = #transport{module = Module, data = Data}) ->
+ {NewTransData, Result} = Module:flush(Data),
+ {Transport#transport{data = NewTransData}, Result}.
--spec close(#transport{}) -> ok | {error, _Reason}.
-close(#transport{module = Module, data = Data}) ->
- Module:close(Data).
+-spec close(#transport{}) -> {#transport{}, ok | {error, _Reason}}.
+close(Transport = #transport{module = Module, data = Data}) ->
+ {NewTransData, Result} = Module:close(Data),
+ {Transport#transport{data = NewTransData}, Result}.