Implement buffered transport
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666386 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/alterl/src/thrift_binary_protocol.erl b/lib/alterl/src/thrift_binary_protocol.erl
index 4e8cfb3..0605dc1 100644
--- a/lib/alterl/src/thrift_binary_protocol.erl
+++ b/lib/alterl/src/thrift_binary_protocol.erl
@@ -13,7 +13,8 @@
-export([new/1,
read/2,
- write/2
+ write/2,
+ flush_transport/1
]).
-record(binary_protocol, {transport}).
@@ -26,6 +27,8 @@
new(Transport) ->
thrift_protocol:new(?MODULE, #binary_protocol{transport = Transport}).
+flush_transport(#binary_protocol{transport = Transport}) ->
+ thrift_transport:flush(Transport).
%%%
%%% instance methods
diff --git a/lib/alterl/src/thrift_buffered_transport.erl b/lib/alterl/src/thrift_buffered_transport.erl
new file mode 100644
index 0000000..c16f26a
--- /dev/null
+++ b/lib/alterl/src/thrift_buffered_transport.erl
@@ -0,0 +1,154 @@
+%%%-------------------------------------------------------------------
+%%% File : thrift_buffered_transport.erl
+%%% Author : <todd@lipcon.org>
+%%% Description : Buffered transport for thrift
+%%%
+%%% Created : 30 Jan 2008 by <todd@lipcon.org>
+%%%-------------------------------------------------------------------
+-module(thrift_buffered_transport).
+
+-behaviour(gen_server).
+-behaviour(thrift_transport).
+
+%% API
+-export([new/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% thrift_transport callbacks
+-export([write/2, read/2, flush/1]).
+
+-record(state, {
+ % The wrapped transport
+ wrapped,
+
+ % a list of binaries which will be concatenated and sent during
+ % a flush.
+ %
+ % *** THIS LIST IS STORED IN REVERSE ORDER!!! ***
+ %
+ buffer}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+new(WrappedTransport) ->
+ case gen_server:start_link(?MODULE, [WrappedTransport], []) of
+ {ok, Pid} ->
+ thrift_transport:new(?MODULE, Pid);
+ Else ->
+ Else
+ end.
+
+
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = binary()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) when is_binary(Data) ->
+ gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transpor) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+ gen_server:call(Transport, {flush}).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+ gen_server:call(Transport, {read, Len}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Wrapped]) ->
+ {ok, #state{wrapped = Wrapped,
+ buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({write, Data}, _From, State = #state{buffer = Buffer}) ->
+ {reply, ok, State#state{buffer = [Data | Buffer]}};
+
+handle_call({read, Len}, _From, State = #state{wrapped = Wrapped}) ->
+ Response = thrift_transport:read(Wrapped, Len),
+ {reply, Response, State};
+
+handle_call({flush}, _From, State = #state{buffer = Buffer,
+ wrapped = Wrapped}) ->
+ Concat = concat_binary(lists:reverse(Buffer)),
+ Response = thrift_transport:write(Wrapped, Concat),
+ % todo(todd) - flush wrapped transport here?
+ {reply, Response, State#state{buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
diff --git a/lib/alterl/src/thrift_processor.erl b/lib/alterl/src/thrift_processor.erl
index 217c216..fa33d3b 100644
--- a/lib/alterl/src/thrift_processor.erl
+++ b/lib/alterl/src/thrift_processor.erl
@@ -123,6 +123,7 @@
seqid = 0}),
ok = thrift_protocol:write(OProto, Reply),
ok = thrift_protocol:write(OProto, message_end),
+ ok = thrift_protocol:flush_transport(OProto),
ok.
diff --git a/lib/alterl/src/thrift_protocol.erl b/lib/alterl/src/thrift_protocol.erl
index f50c612..7db7929 100644
--- a/lib/alterl/src/thrift_protocol.erl
+++ b/lib/alterl/src/thrift_protocol.erl
@@ -4,6 +4,7 @@
write/2,
read/2,
skip/2,
+ flush_transport/1,
typeid_to_atom/1,
@@ -17,7 +18,8 @@
behaviour_info(callbacks) ->
[
{read, 2},
- {write, 2}
+ {write, 2},
+ {flush_transport, 1}
];
behaviour_info(_Else) -> undefined.
@@ -27,6 +29,10 @@
data = Data}}.
+flush_transport(#protocol{module = Module,
+ data = Data}) ->
+ Module:flush_transport(Data).
+
typeid_to_atom(?tType_STOP) -> field_stop;
typeid_to_atom(?tType_VOID) -> void;
typeid_to_atom(?tType_BOOL) -> bool;
diff --git a/lib/alterl/src/thrift_server.erl b/lib/alterl/src/thrift_server.erl
index 5760e1c..3f11164 100644
--- a/lib/alterl/src/thrift_server.erl
+++ b/lib/alterl/src/thrift_server.erl
@@ -111,8 +111,9 @@
{ok, Socket} = gen_tcp:accept(ListenSocket),
error_logger:info_msg("Accepted client"),
- {ok, Transport} = thrift_socket_transport:new(Socket),
- {ok, Protocol} = thrift_binary_protocol:new(Transport),
+ {ok, SocketTransport} = thrift_socket_transport:new(Socket),
+ {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport),
+ {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport),
thrift_processor:start(Protocol, Protocol, Service, Handler),
receive
diff --git a/lib/alterl/src/thrift_socket_transport.erl b/lib/alterl/src/thrift_socket_transport.erl
index 2921c20..a1b2e05 100644
--- a/lib/alterl/src/thrift_socket_transport.erl
+++ b/lib/alterl/src/thrift_socket_transport.erl
@@ -4,7 +4,7 @@
-export([new/1,
- write/2, read/2]).
+ write/2, read/2, flush/1]).
-record(data, {socket}).
@@ -16,3 +16,7 @@
read(#data{socket = Socket}, Len) when is_integer(Len), Len >= 0 ->
gen_tcp:recv(Socket, Len).
+
+% We can't really flush - everything is flushed when we write
+flush(_) ->
+ ok.
diff --git a/lib/alterl/src/thrift_transport.erl b/lib/alterl/src/thrift_transport.erl
index bf9dc53..c47d90f 100644
--- a/lib/alterl/src/thrift_transport.erl
+++ b/lib/alterl/src/thrift_transport.erl
@@ -4,12 +4,14 @@
new/2,
write/2,
- read/2
+ read/2,
+ flush/1
]).
behaviour_info(callbacks) ->
[{write/2,
- read/2}];
+ read/2,
+ flush/1}];
behaviour_info(_Else) -> undefined.
@@ -27,3 +29,6 @@
read(Transport, Len) when is_integer(Len) ->
Module = Transport#transport.module,
Module:read(Transport#transport.data, Len).
+
+flush(#transport{module = Module, data = Data}) ->
+ Module:flush(Data).