erlang: Refactor thrift_transport and all transport implementations

Note that the buffering transports still use a separate process to
maintain their state.  This change just changes them to use a
"return-the-new-version"-style API.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@990989 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/erl/src/thrift_base64_transport.erl b/lib/erl/src/thrift_base64_transport.erl
index 7630333..3cdb351 100644
--- a/lib/erl/src/thrift_base64_transport.erl
+++ b/lib/erl/src/thrift_base64_transport.erl
@@ -37,24 +37,27 @@
     thrift_transport:new(?MODULE, State).
 
 
-write(#b64_transport{wrapped = Wrapped}, Data) ->
-    thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))).
+write(This = #b64_transport{wrapped = Wrapped}, Data) ->
+    {NewWrapped, Result} = thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))),
+    {This#b64_transport{wrapped = NewWrapped}, Result}.
 
 
 %% base64 doesn't support reading quite yet since it would involve
 %% nasty buffering and such
-read(#b64_transport{wrapped = Wrapped}, Data) ->
-    {error, no_reads_allowed}.
+read(This = #b64_transport{}, _Data) ->
+    {This, {error, no_reads_allowed}}.
 
 
-flush(#b64_transport{wrapped = Wrapped}) ->
-    thrift_transport:write(Wrapped, <<"\n">>),
-    thrift_transport:flush(Wrapped).
+flush(This = #b64_transport{wrapped = Wrapped0}) ->
+    {Wrapped1, ok} = thrift_transport:write(Wrapped0, <<"\n">>),
+    {Wrapped2, ok} = thrift_transport:flush(Wrapped1),
+    {This#b64_transport{wrapped = Wrapped2}, ok}.
 
 
-close(Me = #b64_transport{wrapped = Wrapped}) ->
-    flush(Me),
-    thrift_transport:close(Wrapped).
+close(This0) ->
+    {This1 = #b64_transport{wrapped = Wrapped}, ok} = flush(This0),
+    {NewWrapped, ok} = thrift_transport:close(Wrapped),
+    {This1#b64_transport{wrapped = NewWrapped}, ok}.
 
 
 %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
diff --git a/lib/erl/src/thrift_binary_protocol.erl b/lib/erl/src/thrift_binary_protocol.erl
index fcb072b..796089c 100644
--- a/lib/erl/src/thrift_binary_protocol.erl
+++ b/lib/erl/src/thrift_binary_protocol.erl
@@ -61,12 +61,12 @@
 
 
 flush_transport(This = #binary_protocol{transport = Transport}) ->
-    Result = thrift_transport:flush(Transport),
-    {This, Result}.
+    {NewTransport, Result} = thrift_transport:flush(Transport),
+    {This#binary_protocol{transport = NewTransport}, Result}.
 
 close_transport(This = #binary_protocol{transport = Transport}) ->
-    Result = thrift_transport:close(Transport),
-    {This, Result}.
+    {NewTransport, Result} = thrift_transport:close(Transport),
+    {This#binary_protocol{transport = NewTransport}, Result}.
 
 %%%
 %%% instance methods
@@ -166,8 +166,8 @@
 
 %% Data :: iolist()
 write(This = #binary_protocol{transport = Trans}, Data) ->
-    Result = thrift_transport:write(Trans, Data),
-    {This, Result}.
+    {NewTransport, Result} = thrift_transport:write(Trans, Data),
+    {This#binary_protocol{transport = NewTransport}, Result}.
 
 %%
 
@@ -312,8 +312,8 @@
     {#binary_protocol{}, {ok, binary()} | {error, _Reason}}.
 read_data(This, 0) -> {This, {ok, <<>>}};
 read_data(This = #binary_protocol{transport = Trans}, Len) when is_integer(Len) andalso Len > 0 ->
-    Result = thrift_transport:read(Trans, Len),
-    {This, Result}.
+    {NewTransport, Result} = thrift_transport:read(Trans, Len),
+    {This#binary_protocol{transport = NewTransport}, Result}.
 
 
 %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
diff --git a/lib/erl/src/thrift_buffered_transport.erl b/lib/erl/src/thrift_buffered_transport.erl
index c6df73a..6668549 100644
--- a/lib/erl/src/thrift_buffered_transport.erl
+++ b/lib/erl/src/thrift_buffered_transport.erl
@@ -64,7 +64,7 @@
 %% Description: Writes data into the buffer
 %%--------------------------------------------------------------------
 write(Transport, Data) ->
-    gen_server:call(Transport, {write, Data}).
+    {Transport, gen_server:call(Transport, {write, Data})}.
 
 %%--------------------------------------------------------------------
 %% Function: flush(Transport) -> ok
@@ -72,7 +72,7 @@
 %% Description: Flushes the buffer through to the wrapped transport
 %%--------------------------------------------------------------------
 flush(Transport) ->
-    gen_server:call(Transport, flush).
+    {Transport, gen_server:call(Transport, flush)}.
 
 %%--------------------------------------------------------------------
 %% Function: close(Transport) -> ok
@@ -80,7 +80,7 @@
 %% Description: Closes the transport and the wrapped transport
 %%--------------------------------------------------------------------
 close(Transport) ->
-    gen_server:cast(Transport, close).
+    {Transport, gen_server:cast(Transport, close)}.
 
 %%--------------------------------------------------------------------
 %% Function: Read(Transport, Len) -> {ok, Data}
@@ -90,7 +90,7 @@
 %% Description: Reads data through from the wrapped transoprt
 %%--------------------------------------------------------------------
 read(Transport, Len) when is_integer(Len) ->
-    gen_server:call(Transport, {read, Len}, _Timeout=10000).
+    {Transport, gen_server:call(Transport, {read, Len}, _Timeout=10000)}.
 
 %%====================================================================
 %% gen_server callbacks
@@ -120,14 +120,17 @@
     {reply, ok, State#buffered_transport{write_buffer = [WBuf, Data]}};
 
 handle_call({read, Len}, _From, State = #buffered_transport{wrapped = Wrapped}) ->
-    Response = thrift_transport:read(Wrapped, Len),
-    {reply, Response, State};
+    {NewWrapped, Response} = thrift_transport:read(Wrapped, Len),
+    NewState = State#buffered_transport{wrapped = NewWrapped},
+    {reply, Response, NewState};
 
 handle_call(flush, _From, State = #buffered_transport{write_buffer = WBuf,
-                                                      wrapped = Wrapped}) ->
-    Response = thrift_transport:write(Wrapped, WBuf),
-    thrift_transport:flush(Wrapped),
-    {reply, Response, State#buffered_transport{write_buffer = []}}.
+                                                      wrapped = Wrapped0}) ->
+    {Wrapped1, Response} = thrift_transport:write(Wrapped0, WBuf),
+    {Wrapped2, _} = thrift_transport:flush(Wrapped1),
+    NewState = State#buffered_transport{write_buffer = [],
+                                        wrapped = Wrapped2},
+    {reply, Response, NewState}.
 
 %%--------------------------------------------------------------------
 %% Function: handle_cast(Msg, State) -> {noreply, State} |
diff --git a/lib/erl/src/thrift_disk_log_transport.erl b/lib/erl/src/thrift_disk_log_transport.erl
index 2645c67..87d9547 100644
--- a/lib/erl/src/thrift_disk_log_transport.erl
+++ b/lib/erl/src/thrift_disk_log_transport.erl
@@ -68,30 +68,33 @@
 %%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
 %% disk_log_transport is write-only
-read(_State, Len) ->
-    {error, no_read_from_disk_log}.
+read(State, Len) ->
+    {State, {error, no_read_from_disk_log}}.
 
-write(#dl_transport{log = Log}, Data) ->
-    disk_log:balog(Log, erlang:iolist_to_binary(Data)).
+write(This = #dl_transport{log = Log}, Data) ->
+    {This, disk_log:balog(Log, erlang:iolist_to_binary(Data))}.
 
 force_flush(#dl_transport{log = Log}) ->
     error_logger:info_msg("~p syncing~n", [?MODULE]),
     disk_log:sync(Log).
 
-flush(#dl_transport{log = Log, sync_every = SE}) ->
+flush(This = #dl_transport{log = Log, sync_every = SE}) ->
     case SE of
         undefined -> % no time-based sync
             disk_log:sync(Log);
         _Else ->     % sync will happen automagically
             ok
-    end.
+    end,
+    {This, ok}.
+
+
 
 
 %% On close, close the underlying log if we're configured to do so.
-close(#dl_transport{close_on_close = false}) ->
-    ok;
-close(#dl_transport{log = Log}) ->
-    disk_log:lclose(Log).
+close(This = #dl_transport{close_on_close = false}) ->
+    {This, ok};
+close(This = #dl_transport{log = Log}) ->
+    {This, disk_log:lclose(Log)}.
 
 
 %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
diff --git a/lib/erl/src/thrift_file_transport.erl b/lib/erl/src/thrift_file_transport.erl
index 7ee1c12..63f2d75 100644
--- a/lib/erl/src/thrift_file_transport.erl
+++ b/lib/erl/src/thrift_file_transport.erl
@@ -65,25 +65,25 @@
 
 %%%% TRANSPORT IMPL %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
-write(#t_file_transport{device = Device, mode = write}, Data) ->
-    file:write(Device, Data);
-write(_T, _D) ->
-    {error, read_mode}.
+write(This = #t_file_transport{device = Device, mode = write}, Data) ->
+    {This, file:write(Device, Data)};
+write(This, _D) ->
+    {This, {error, read_mode}}.
 
 
-read(#t_file_transport{device = Device, mode = read}, Len)
+read(This = #t_file_transport{device = Device, mode = read}, Len)
   when is_integer(Len), Len >= 0 ->
-    file:read(Device, Len);
-read(_T, _D) ->
-    {error, read_mode}.
+    {This, file:read(Device, Len)};
+read(This, _D) ->
+    {This, {error, read_mode}}.
 
-flush(#t_file_transport{device = Device, mode = write}) ->
-    file:sync(Device).
+flush(This = #t_file_transport{device = Device, mode = write}) ->
+    {This, file:sync(Device)}.
 
-close(#t_file_transport{device = Device, should_close = SC}) ->
+close(This = #t_file_transport{device = Device, should_close = SC}) ->
     case SC of
         true ->
-            file:close(Device);
+            {This, file:close(Device)};
         false ->
-            ok
+            {This, ok}
     end.
diff --git a/lib/erl/src/thrift_framed_transport.erl b/lib/erl/src/thrift_framed_transport.erl
index 92bd588..c1c8850 100644
--- a/lib/erl/src/thrift_framed_transport.erl
+++ b/lib/erl/src/thrift_framed_transport.erl
@@ -62,7 +62,7 @@
 %% Description: Writes data into the buffer
 %%--------------------------------------------------------------------
 write(Transport, Data) ->
-    gen_server:call(Transport, {write, Data}).
+    {Transport, gen_server:call(Transport, {write, Data})}.
 
 %%--------------------------------------------------------------------
 %% Function: flush(Transport) -> ok
@@ -70,7 +70,7 @@
 %% Description: Flushes the buffer through to the wrapped transport
 %%--------------------------------------------------------------------
 flush(Transport) ->
-    gen_server:call(Transport, flush).
+    {Transport, gen_server:call(Transport, flush)}.
 
 %%--------------------------------------------------------------------
 %% Function: close(Transport) -> ok
@@ -78,7 +78,7 @@
 %% Description: Closes the transport and the wrapped transport
 %%--------------------------------------------------------------------
 close(Transport) ->
-    gen_server:cast(Transport, close).
+    {Transport, gen_server:cast(Transport, close)}.
 
 %%--------------------------------------------------------------------
 %% Function: Read(Transport, Len) -> {ok, Data}
@@ -88,7 +88,7 @@
 %% Description: Reads data through from the wrapped transoprt
 %%--------------------------------------------------------------------
 read(Transport, Len) when is_integer(Len) ->
-    gen_server:call(Transport, {read, Len}).
+    {Transport, gen_server:call(Transport, {read, Len})}.
 
 %%====================================================================
 %% gen_server callbacks
@@ -118,22 +118,22 @@
 handle_call({write, Data}, _From, State = #framed_transport{write_buffer = WBuf}) ->
     {reply, ok, State#framed_transport{write_buffer = [WBuf, Data]}};
 
-handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped,
+handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped0,
                                                           read_buffer = RBuf}) ->
-    {RBuf1, RBuf1Size} =
+    {Wrapped1, {RBuf1, RBuf1Size}} =
         %% if the read buffer is empty, read another frame
         %% otherwise, just read from what's left in the buffer
         case iolist_size(RBuf) of
             0 ->
                 %% read the frame length
-                {ok, <<FrameLen:32/integer-signed-big, _/binary>>} =
-                    thrift_transport:read(Wrapped, 4),
+                {WrappedS1, {ok, <<FrameLen:32/integer-signed-big, _/binary>>}} =
+                    thrift_transport:read(Wrapped0, 4),
                 %% then read the data
-                {ok, Bin} =
-                    thrift_transport:read(Wrapped, FrameLen),
-                {Bin, erlang:byte_size(Bin)};
+                {WrappedS2, {ok, Bin}} =
+                    thrift_transport:read(WrappedS1, FrameLen),
+                {WrappedS2, {Bin, erlang:byte_size(Bin)}};
             Sz ->
-                {RBuf, Sz}
+                {Wrapped0, {RBuf, Sz}}
         end,
 
     %% pull off Give bytes, return them to the user, leave the rest in the buffer
@@ -141,7 +141,7 @@
     <<Data:Give/binary, RBuf2/binary>> = iolist_to_binary(RBuf1),
 
     Response = {ok, Data},
-    State1 = State#framed_transport{read_buffer=RBuf2},
+    State1 = State#framed_transport{wrapped = Wrapped1, read_buffer=RBuf2},
 
     {reply, Response, State1};
 
@@ -193,15 +193,15 @@
 %%% Internal functions
 %%--------------------------------------------------------------------
 do_flush(State = #framed_transport{write_buffer = Buffer,
-                                   wrapped = Wrapped}) ->
+                                   wrapped = Wrapped0}) ->
     FrameLen = iolist_size(Buffer),
     Data     = [<<FrameLen:32/integer-signed-big>>, Buffer],
 
-    Response = thrift_transport:write(Wrapped, Data),
+    {Wrapped1, Response} = thrift_transport:write(Wrapped0, Data),
 
-    thrift_transport:flush(Wrapped),
+    {Wrapped2, _} = thrift_transport:flush(Wrapped1),
 
-    State1 = State#framed_transport{write_buffer = []},
+    State1 = State#framed_transport{wrapped = Wrapped2, write_buffer = []},
     {Response, State1}.
 
 min(A,B) when A<B -> A;
diff --git a/lib/erl/src/thrift_http_transport.erl b/lib/erl/src/thrift_http_transport.erl
index f0a45fe..44f2313 100644
--- a/lib/erl/src/thrift_http_transport.erl
+++ b/lib/erl/src/thrift_http_transport.erl
@@ -77,7 +77,7 @@
 %% Description: Writes data into the buffer
 %%--------------------------------------------------------------------
 write(Transport, Data) ->
-    gen_server:call(Transport, {write, Data}).
+    {Transport, gen_server:call(Transport, {write, Data})}.
 
 %%--------------------------------------------------------------------
 %% Function: flush(Transport) -> ok
@@ -85,7 +85,7 @@
 %% Description: Flushes the buffer, making a request
 %%--------------------------------------------------------------------
 flush(Transport) ->
-    gen_server:call(Transport, flush).
+    {Transport, gen_server:call(Transport, flush)}.
 
 %%--------------------------------------------------------------------
 %% Function: close(Transport) -> ok
@@ -93,7 +93,7 @@
 %% Description: Closes the transport
 %%--------------------------------------------------------------------
 close(Transport) ->
-    gen_server:cast(Transport, close).
+    {Transport, gen_server:cast(Transport, close)}.
 
 %%--------------------------------------------------------------------
 %% Function: Read(Transport, Len) -> {ok, Data}
@@ -103,7 +103,7 @@
 %% Description: Reads data through from the wrapped transoprt
 %%--------------------------------------------------------------------
 read(Transport, Len) when is_integer(Len) ->
-    gen_server:call(Transport, {read, Len}).
+    {Transport, gen_server:call(Transport, {read, Len})}.
 
 %%====================================================================
 %% gen_server callbacks
diff --git a/lib/erl/src/thrift_memory_buffer.erl b/lib/erl/src/thrift_memory_buffer.erl
index 34a3668..ae8b6d2 100644
--- a/lib/erl/src/thrift_memory_buffer.erl
+++ b/lib/erl/src/thrift_memory_buffer.erl
@@ -58,7 +58,7 @@
 %% Description: Writes data into the buffer
 %%--------------------------------------------------------------------
 write(Transport, Data) ->
-    gen_server:call(Transport, {write, Data}).
+    {Transport, gen_server:call(Transport, {write, Data})}.
 
 %%--------------------------------------------------------------------
 %% Function: flush(Transport) -> ok
@@ -66,7 +66,7 @@
 %% Description: Flushes the buffer through to the wrapped transport
 %%--------------------------------------------------------------------
 flush(Transport) ->
-    gen_server:call(Transport, flush).
+    {Transport, gen_server:call(Transport, flush)}.
 
 %%--------------------------------------------------------------------
 %% Function: close(Transport) -> ok
@@ -74,7 +74,7 @@
 %% Description: Closes the transport and the wrapped transport
 %%--------------------------------------------------------------------
 close(Transport) ->
-    gen_server:cast(Transport, close).
+    {Transport, gen_server:cast(Transport, close)}.
 
 %%--------------------------------------------------------------------
 %% Function: Read(Transport, Len) -> {ok, Data}
@@ -84,7 +84,7 @@
 %% Description: Reads data through from the wrapped transoprt
 %%--------------------------------------------------------------------
 read(Transport, Len) when is_integer(Len) ->
-    gen_server:call(Transport, {read, Len}).
+    {Transport, gen_server:call(Transport, {read, Len})}.
 
 %%====================================================================
 %% gen_server callbacks
diff --git a/lib/erl/src/thrift_socket_transport.erl b/lib/erl/src/thrift_socket_transport.erl
index 1a8ba81..4c552ae 100644
--- a/lib/erl/src/thrift_socket_transport.erl
+++ b/lib/erl/src/thrift_socket_transport.erl
@@ -47,25 +47,26 @@
     thrift_transport:new(?MODULE, State).
 
 %% Data :: iolist()
-write(#data{socket = Socket}, Data) ->
-    gen_tcp:send(Socket, Data).
+write(This = #data{socket = Socket}, Data) ->
+    {This, gen_tcp:send(Socket, Data)}.
 
-read(#data{socket=Socket, recv_timeout=Timeout}, Len)
+read(This = #data{socket=Socket, recv_timeout=Timeout}, Len)
   when is_integer(Len), Len >= 0 ->
     case gen_tcp:recv(Socket, Len, Timeout) of
         Err = {error, timeout} ->
             error_logger:info_msg("read timeout: peer conn ~p", [inet:peername(Socket)]),
             gen_tcp:close(Socket),
-            Err;
-        Data -> Data
+            {This, Err};
+        Data ->
+            {This, Data}
     end.
 
 %% We can't really flush - everything is flushed when we write
-flush(_) ->
-    ok.
+flush(This) ->
+    {This, ok}.
 
-close(#data{socket = Socket}) ->
-    gen_tcp:close(Socket).
+close(This = #data{socket = Socket}) ->
+    {This, gen_tcp:close(Socket)}.
 
 
 %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
diff --git a/lib/erl/src/thrift_transport.erl b/lib/erl/src/thrift_transport.erl
index 420281c..0de7a10 100644
--- a/lib/erl/src/thrift_transport.erl
+++ b/lib/erl/src/thrift_transport.erl
@@ -41,20 +41,24 @@
     {ok, #transport{module = Module,
                     data = Data}}.
 
--spec write(#transport{}, iolist() | binary()) -> ok | {error, _Reason}.
+-spec write(#transport{}, iolist() | binary()) -> {#transport{}, ok | {error, _Reason}}.
 write(Transport, Data) ->
     Module = Transport#transport.module,
-    Module:write(Transport#transport.data, Data).
+    {NewTransData, Result} = Module:write(Transport#transport.data, Data),
+    {Transport#transport{data = NewTransData}, Result}.
 
--spec read(#transport{}, non_neg_integer()) -> {ok, binary()} | {error, _Reason}.
+-spec read(#transport{}, non_neg_integer()) -> {#transport{}, {ok, binary()} | {error, _Reason}}.
 read(Transport, Len) when is_integer(Len) ->
     Module = Transport#transport.module,
-    Module:read(Transport#transport.data, Len).
+    {NewTransData, Result} = Module:read(Transport#transport.data, Len),
+    {Transport#transport{data = NewTransData}, Result}.
 
--spec flush(#transport{}) -> ok | {error, _Reason}.
-flush(#transport{module = Module, data = Data}) ->
-    Module:flush(Data).
+-spec flush(#transport{}) -> {#transport{}, ok | {error, _Reason}}.
+flush(Transport = #transport{module = Module, data = Data}) ->
+    {NewTransData, Result} = Module:flush(Data),
+    {Transport#transport{data = NewTransData}, Result}.
 
--spec close(#transport{}) -> ok | {error, _Reason}.
-close(#transport{module = Module, data = Data}) ->
-    Module:close(Data).
+-spec close(#transport{}) -> {#transport{}, ok | {error, _Reason}}.
+close(Transport = #transport{module = Module, data = Data}) ->
+    {NewTransData, Result} = Module:close(Data),
+    {Transport#transport{data = NewTransData}, Result}.