Implement buffered transport


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666386 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/alterl/src/thrift_binary_protocol.erl b/lib/alterl/src/thrift_binary_protocol.erl
index 4e8cfb3..0605dc1 100644
--- a/lib/alterl/src/thrift_binary_protocol.erl
+++ b/lib/alterl/src/thrift_binary_protocol.erl
@@ -13,7 +13,8 @@
 
 -export([new/1,
          read/2,
-         write/2
+         write/2,
+         flush_transport/1
 ]).
 
 -record(binary_protocol, {transport}).
@@ -26,6 +27,8 @@
 new(Transport) ->
     thrift_protocol:new(?MODULE, #binary_protocol{transport = Transport}).
 
+flush_transport(#binary_protocol{transport = Transport}) ->
+    thrift_transport:flush(Transport).
 
 %%%
 %%% instance methods
diff --git a/lib/alterl/src/thrift_buffered_transport.erl b/lib/alterl/src/thrift_buffered_transport.erl
new file mode 100644
index 0000000..c16f26a
--- /dev/null
+++ b/lib/alterl/src/thrift_buffered_transport.erl
@@ -0,0 +1,154 @@
+%%%-------------------------------------------------------------------
+%%% File    : thrift_buffered_transport.erl
+%%% Author  :  <todd@lipcon.org>
+%%% Description : Buffered transport for thrift
+%%%
+%%% Created : 30 Jan 2008 by  <todd@lipcon.org>
+%%%-------------------------------------------------------------------
+-module(thrift_buffered_transport).
+
+-behaviour(gen_server).
+-behaviour(thrift_transport).
+
+%% API
+-export([new/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+%% thrift_transport callbacks
+-export([write/2, read/2, flush/1]).
+
+-record(state, {
+          % The wrapped transport
+          wrapped,
+
+          % a list of binaries which will be concatenated and sent during
+          % a flush.
+          %
+          % *** THIS LIST IS STORED IN REVERSE ORDER!!! ***
+          %
+          buffer}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+new(WrappedTransport) ->
+    case gen_server:start_link(?MODULE, [WrappedTransport], []) of
+        {ok, Pid} ->
+            thrift_transport:new(?MODULE, Pid);
+        Else ->
+            Else
+    end.
+
+
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%% 
+%% Data = binary()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) when is_binary(Data) ->
+    gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transpor) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+    gen_server:call(Transport, {flush}).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%% 
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+    gen_server:call(Transport, {read, Len}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%%                         {ok, State, Timeout} |
+%%                         ignore               |
+%%                         {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Wrapped]) ->
+    {ok, #state{wrapped = Wrapped,
+                buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%%                                      {reply, Reply, State, Timeout} |
+%%                                      {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, Reply, State} |
+%%                                      {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({write, Data}, _From, State = #state{buffer = Buffer}) ->
+    {reply, ok, State#state{buffer = [Data | Buffer]}};
+
+handle_call({read, Len}, _From, State = #state{wrapped = Wrapped}) ->
+    Response = thrift_transport:read(Wrapped, Len),
+    {reply, Response, State};
+
+handle_call({flush}, _From, State = #state{buffer = Buffer,
+                                           wrapped = Wrapped}) ->
+    Concat = concat_binary(lists:reverse(Buffer)),
+    Response = thrift_transport:write(Wrapped, Concat),
+    % todo(todd) - flush wrapped transport here?
+    {reply, Response, State#state{buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%%                                       {noreply, State, Timeout} |
+%%                                       {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
diff --git a/lib/alterl/src/thrift_processor.erl b/lib/alterl/src/thrift_processor.erl
index 217c216..fa33d3b 100644
--- a/lib/alterl/src/thrift_processor.erl
+++ b/lib/alterl/src/thrift_processor.erl
@@ -123,6 +123,7 @@
                                  seqid = 0}),
     ok = thrift_protocol:write(OProto, Reply),
     ok = thrift_protocol:write(OProto, message_end),
+    ok = thrift_protocol:flush_transport(OProto),
     ok.
 
 
diff --git a/lib/alterl/src/thrift_protocol.erl b/lib/alterl/src/thrift_protocol.erl
index f50c612..7db7929 100644
--- a/lib/alterl/src/thrift_protocol.erl
+++ b/lib/alterl/src/thrift_protocol.erl
@@ -4,6 +4,7 @@
          write/2,
          read/2,
          skip/2,
+         flush_transport/1,
 
          typeid_to_atom/1,
 
@@ -17,7 +18,8 @@
 behaviour_info(callbacks) ->
     [
      {read, 2},
-     {write, 2}
+     {write, 2},
+     {flush_transport, 1}
     ];
 behaviour_info(_Else) -> undefined.
 
@@ -27,6 +29,10 @@
                    data = Data}}.
 
 
+flush_transport(#protocol{module = Module,
+                          data = Data}) ->
+    Module:flush_transport(Data).
+
 typeid_to_atom(?tType_STOP) -> field_stop;
 typeid_to_atom(?tType_VOID) -> void;
 typeid_to_atom(?tType_BOOL) -> bool;
diff --git a/lib/alterl/src/thrift_server.erl b/lib/alterl/src/thrift_server.erl
index 5760e1c..3f11164 100644
--- a/lib/alterl/src/thrift_server.erl
+++ b/lib/alterl/src/thrift_server.erl
@@ -111,8 +111,9 @@
     {ok, Socket} = gen_tcp:accept(ListenSocket),
     error_logger:info_msg("Accepted client"),
 
-    {ok, Transport} = thrift_socket_transport:new(Socket),
-    {ok, Protocol} = thrift_binary_protocol:new(Transport),
+    {ok, SocketTransport} = thrift_socket_transport:new(Socket),
+    {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport),
+    {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport),
 
     thrift_processor:start(Protocol, Protocol, Service, Handler),
     receive
diff --git a/lib/alterl/src/thrift_socket_transport.erl b/lib/alterl/src/thrift_socket_transport.erl
index 2921c20..a1b2e05 100644
--- a/lib/alterl/src/thrift_socket_transport.erl
+++ b/lib/alterl/src/thrift_socket_transport.erl
@@ -4,7 +4,7 @@
 
 -export([new/1,
          
-         write/2, read/2]).
+         write/2, read/2, flush/1]).
 
 -record(data, {socket}).
 
@@ -16,3 +16,7 @@
 
 read(#data{socket = Socket}, Len) when is_integer(Len), Len >= 0 ->
     gen_tcp:recv(Socket, Len).
+
+% We can't really flush - everything is flushed when we write
+flush(_) ->
+     ok.
diff --git a/lib/alterl/src/thrift_transport.erl b/lib/alterl/src/thrift_transport.erl
index bf9dc53..c47d90f 100644
--- a/lib/alterl/src/thrift_transport.erl
+++ b/lib/alterl/src/thrift_transport.erl
@@ -4,12 +4,14 @@
 
          new/2,
          write/2,
-         read/2
+         read/2,
+         flush/1
         ]).
 
 behaviour_info(callbacks) ->
     [{write/2,
-      read/2}];
+      read/2,
+      flush/1}];
 behaviour_info(_Else) -> undefined.
 
 
@@ -27,3 +29,6 @@
 read(Transport, Len) when is_integer(Len) ->
     Module = Transport#transport.module,
     Module:read(Transport#transport.data, Len).
+
+flush(#transport{module = Module, data = Data}) ->
+    Module:flush(Data).