Erlang: add framed_transport and non-strict binary_protocol

- thrift_client now takes as its fourth parameter Options: framed, strict_{read,write}, connect_timeout (P.S. fourth param used to be Timeout)
- binary protocol now takes options: strict_{read,write}
- buffers in framed and buffered transport are now iolists and not reversed lists of binaries
- rename buffer in buffered transport "write_buffer" to match framed transport


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666447 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/alterl/src/thrift_binary_protocol.erl b/lib/alterl/src/thrift_binary_protocol.erl
index f115b2f..da43d14 100644
--- a/lib/alterl/src/thrift_binary_protocol.erl
+++ b/lib/alterl/src/thrift_binary_protocol.erl
@@ -11,22 +11,37 @@
 -include("thrift_constants.hrl").
 -include("thrift_protocol.hrl").
 
--export([new/1,
+-export([new/1, new/2,
          read/2,
          write/2,
          flush_transport/1,
          close_transport/1
-]).
+        ]).
 
--record(binary_protocol, {transport}).
-
+-record(binary_protocol, {transport,
+                          strict_read=true,
+                          strict_write=true
+                         }).
 
 -define(VERSION_MASK, 16#FFFF0000).
 -define(VERSION_1, 16#80010000).
-
+-define(TYPE_MASK, 16#000000ff).
 
 new(Transport) ->
-    thrift_protocol:new(?MODULE, #binary_protocol{transport = Transport}).
+    new(Transport, _Options = []).
+
+new(Transport, Options) ->
+    State  = #binary_protocol{transport = Transport},
+    State1 = parse_options(Options, State),
+    thrift_protocol:new(?MODULE, State1).
+
+parse_options([], State) ->
+    State;
+parse_options([{strict_read, Bool} | Rest], State) when is_boolean(Bool) ->
+    parse_options(Rest, State#binary_protocol{strict_read=Bool});
+parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) ->
+    parse_options(Rest, State#binary_protocol{strict_write=Bool}).
+
 
 flush_transport(#binary_protocol{transport = Transport}) ->
     thrift_transport:flush(Transport).
@@ -42,9 +57,16 @@
         name = Name,
         type = Type,
         seqid = Seqid}) ->
-    write(This, {i32, ?VERSION_1 bor Type}),
-    write(This, {string, Name}),
-    write(This, {i32, Seqid}),
+    case This#binary_protocol.strict_write of
+        true ->
+            write(This, {i32, ?VERSION_1 bor Type}),
+            write(This, {string, Name}),
+            write(This, {i32, Seqid});
+        false ->
+            write(This, {string, Name}),
+            write(This, {byte, Type}),
+            write(This, {i32, Seqid})
+    end,
     ok;
 
 write(This, message_end) -> ok;
@@ -121,20 +143,40 @@
     write(This, {i32, size(Bin)}),
     write(This, Bin);
 
-write(This, Binary) when is_binary(Binary) ->
-    thrift_transport:write(This#binary_protocol.transport, Binary).
+%% Data :: iolist()
+write(This, Data) ->
+    thrift_transport:write(This#binary_protocol.transport, Data).
 
 %%
 
 read(This, message_begin) ->
     case read(This, i32) of
-        {ok, Version} when Version band ?VERSION_MASK == ?VERSION_1 ->
-            Type = Version band 16#000000ff,
+        {ok, Sz} when Sz band ?VERSION_MASK =:= ?VERSION_1 ->
+            %% we're at version 1
             {ok, Name}  = read(This, string),
+            Type        = Sz band ?TYPE_MASK,
             {ok, SeqId} = read(This, i32),
             #protocol_message_begin{name  = binary_to_list(Name),
                                     type  = Type,
                                     seqid = SeqId};
+
+        {ok, Sz} when Sz < 0 ->
+            %% there's a version number but it's unexpected
+            {error, {bad_binary_protocol_version, Sz}};
+
+        {ok, Sz} when This#binary_protocol.strict_read =:= true ->
+            %% strict_read is true and there's no version header; that's an error
+            {error, no_binary_protocol_version};
+
+        {ok, Sz} when This#binary_protocol.strict_read =:= false ->
+            %% strict_read is false, so just read the old way
+            {ok, Name}  = read(This, Sz),
+            {ok, Type}  = read(This, byte),
+            {ok, SeqId} = read(This, i32),
+            #protocol_message_begin{name  = binary_to_list(Name),
+                                    type  = Type,
+                                    seqid = SeqId};
+
         Err = {error, closed} -> Err;
         Err = {error, ebadf}  -> Err
     end;
diff --git a/lib/alterl/src/thrift_buffered_transport.erl b/lib/alterl/src/thrift_buffered_transport.erl
index d79e987..575245d 100644
--- a/lib/alterl/src/thrift_buffered_transport.erl
+++ b/lib/alterl/src/thrift_buffered_transport.erl
@@ -21,11 +21,7 @@
 -export([write/2, read/2, flush/1, close/1]).
 
 -record(buffered_transport, {wrapped, % a thrift_transport
-                             buffer
-                             %% a list of binaries which will be concatenated and sent during
-                             %% a flush.
-                             %%
-                             %% *** THIS LIST IS STORED IN REVERSE ORDER!!! ***
+                             write_buffer % iolist()
                             }).
 
 %%====================================================================
@@ -46,11 +42,11 @@
 %%--------------------------------------------------------------------
 %% Function: write(Transport, Data) -> ok
 %%
-%% Data = binary()
+%% Data = iolist()
 %%
 %% Description: Writes data into the buffer
 %%--------------------------------------------------------------------
-write(Transport, Data) when is_binary(Data) ->
+write(Transport, Data) ->
     gen_server:call(Transport, {write, Data}).
 
 %%--------------------------------------------------------------------
@@ -94,7 +90,7 @@
     %% TODO(cpiro): need to trap exits here so when transport exits
     %% normally from under our feet we exit normally
     {ok, #buffered_transport{wrapped = Wrapped,
-                             buffer = []}}.
+                             write_buffer = []}}.
 
 %%--------------------------------------------------------------------
 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -105,19 +101,18 @@
 %%                                      {stop, Reason, State}
 %% Description: Handling call messages
 %%--------------------------------------------------------------------
-handle_call({write, Data}, _From, State = #buffered_transport{buffer = Buffer}) ->
-    {reply, ok, State#buffered_transport{buffer = [Data | Buffer]}};
+handle_call({write, Data}, _From, State = #buffered_transport{write_buffer = WBuf}) ->
+    {reply, ok, State#buffered_transport{write_buffer = [WBuf, Data]}};
 
 handle_call({read, Len}, _From, State = #buffered_transport{wrapped = Wrapped}) ->
     Response = thrift_transport:read(Wrapped, Len),
     {reply, Response, State};
 
-handle_call(flush, _From, State = #buffered_transport{buffer = Buffer,
+handle_call(flush, _From, State = #buffered_transport{write_buffer = WBuf,
                                                       wrapped = Wrapped}) ->
-    Concat   = concat_binary(lists:reverse(Buffer)),
-    Response = thrift_transport:write(Wrapped, Concat),
+    Response = thrift_transport:write(Wrapped, WBuf),
     thrift_transport:flush(Wrapped),
-    {reply, Response, State#buffered_transport{buffer = []}}.
+    {reply, Response, State#buffered_transport{write_buffer = []}}.
 
 %%--------------------------------------------------------------------
 %% Function: handle_cast(Msg, State) -> {noreply, State} |
@@ -125,9 +120,9 @@
 %%                                      {stop, Reason, State}
 %% Description: Handling cast messages
 %%--------------------------------------------------------------------
-handle_cast(close, State = #buffered_transport{buffer = Buffer,
+handle_cast(close, State = #buffered_transport{write_buffer = WBuf,
                                                wrapped = Wrapped}) ->
-    thrift_transport:write(Wrapped, concat_binary(lists:reverse(Buffer))),
+    thrift_transport:write(Wrapped, WBuf),
     %% Wrapped is closed by terminate/2
     %%  error_logger:info_msg("thrift_buffered_transport ~p: closing", [self()]),
     {stop, normal, State};
diff --git a/lib/alterl/src/thrift_client.erl b/lib/alterl/src/thrift_client.erl
index af7aea9..779595c 100644
--- a/lib/alterl/src/thrift_client.erl
+++ b/lib/alterl/src/thrift_client.erl
@@ -20,7 +20,12 @@
 -include("thrift_constants.hrl").
 -include("thrift_protocol.hrl").
 
--record(state, {service, protocol, seqid}).
+-record(state, {service, protocol, seqid,
+                strict_read = true,
+                strict_write = true,
+                framed = false,
+                connect_timeout = infinity
+               }).
 
 %%====================================================================
 %% API
@@ -30,10 +35,10 @@
 %% Description: Starts the server
 %%--------------------------------------------------------------------
 start_link(Host, Port, Service) ->
-    start_link(Host, Port, Service, _Timeout = infinity).
+    start_link(Host, Port, Service, []).
 
-start_link(Host, Port, Service, Timeout) when is_integer(Port), is_atom(Service) ->
-    gen_server:start_link(?MODULE, [Host, Port, Service, Timeout], []).
+start_link(Host, Port, Service, Options) when is_integer(Port), is_atom(Service), is_list(Options) ->
+    gen_server:start_link(?MODULE, [Host, Port, Service, Options], []).
 
 call(Client, Function, Args)
   when is_pid(Client), is_atom(Function), is_list(Args) ->
@@ -57,24 +62,41 @@
 %%                         {stop, Reason}
 %% Description: Initiates the server
 %%--------------------------------------------------------------------
-init([Host, Port, Service]) ->
-    init([Host, Port, Service, infinity]);
+init([Host, Port, Service, Options]) ->
+    State = parse_options(Options, #state{}),
 
-init([Host, Port, Service, Timeout]) ->
     {ok, Sock} = gen_tcp:connect(Host, Port,
                                  [binary,
                                   {packet, 0},
                                   {active, false},
                                   {nodelay, true}
                                  ],
-                                Timeout),
+                                State#state.connect_timeout),
 
-    {ok, Transport}    = thrift_socket_transport:new(Sock),
-    {ok, BufTransport} = thrift_buffered_transport:new(Transport),
-    {ok, Protocol}     = thrift_binary_protocol:new(BufTransport),
-    {ok, #state{service  = Service,
-                protocol = Protocol,
-                seqid    = 0}}.
+    {ok, Transport} = thrift_socket_transport:new(Sock),
+    {ok, BufTransport} =
+        case State#state.framed of
+            true  -> thrift_framed_transport:new(Transport);
+            false -> thrift_buffered_transport:new(Transport)
+        end,
+    {ok, Protocol} = thrift_binary_protocol:new(BufTransport,
+                         [{strict_read,  State#state.strict_read},
+                          {strict_write, State#state.strict_write}]),
+
+    {ok, State#state{service  = Service,
+                     protocol = Protocol,
+                     seqid    = 0}}.
+
+parse_options([], State) ->
+    State;
+parse_options([{strict_read, Bool} | Rest], State) when is_boolean(Bool) ->
+    parse_options(Rest, State#state{strict_read=Bool});
+parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) ->
+    parse_options(Rest, State#state{strict_write=Bool});
+parse_options([{framed, Bool} | Rest], State) when is_boolean(Bool) ->
+    parse_options(Rest, State#state{framed=Bool});
+parse_options([{connect_timeout, TO} | Rest], State) when TO =:= infinity; is_integer(TO) ->
+    parse_options(Rest, State#state{connect_timeout=TO}).
 
 %%--------------------------------------------------------------------
 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
diff --git a/lib/alterl/src/thrift_framed_transport.erl b/lib/alterl/src/thrift_framed_transport.erl
new file mode 100644
index 0000000..814e0d9
--- /dev/null
+++ b/lib/alterl/src/thrift_framed_transport.erl
@@ -0,0 +1,198 @@
+%%%-------------------------------------------------------------------
+%%% File    : thrift_framed_transport.erl
+%%% Author  : <cpiro@facebook.com>
+%%% Description : Framed transport for thrift
+%%%
+%%% Created : 12 Mar 2008 by <cpiro@facebook.com>
+%%%-------------------------------------------------------------------
+-module(thrift_framed_transport).
+
+-behaviour(gen_server).
+-behaviour(thrift_transport).
+
+%% API
+-export([new/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+%% thrift_transport callbacks
+-export([write/2, read/2, flush/1, close/1]).
+
+-record(framed_transport, {wrapped, % a thrift_transport
+                           read_buffer, % iolist()
+                           write_buffer % iolist()
+                          }).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+new(WrappedTransport) ->
+    case gen_server:start_link(?MODULE, [WrappedTransport], []) of
+        {ok, Pid} ->
+            thrift_transport:new(?MODULE, Pid);
+        Else ->
+            Else
+    end.
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+    gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+    gen_server:call(Transport, flush).
+
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport and the wrapped transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+    gen_server:cast(Transport, close).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+    gen_server:call(Transport, {read, Len}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%%                         {ok, State, Timeout} |
+%%                         ignore               |
+%%                         {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Wrapped]) ->
+    %% TODO(cpiro): need to trap exits here so when transport exits
+    %% normally from under our feet we exit normally
+    {ok, #framed_transport{wrapped = Wrapped,
+                           read_buffer = [],
+                           write_buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%%                                      {reply, Reply, State, Timeout} |
+%%                                      {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, Reply, State} |
+%%                                      {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({write, Data}, _From, State = #framed_transport{write_buffer = WBuf}) ->
+    {reply, ok, State#framed_transport{write_buffer = [WBuf, Data]}};
+
+handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped,
+                                                          read_buffer = RBuf}) ->
+    {RBuf1, RBuf1Size} =
+        %% if the read buffer is empty, read another frame
+        %% otherwise, just read from what's left in the buffer
+        case iolist_size(RBuf) of
+            0 ->
+                %% read the frame length
+                {ok, <<FrameLen:32/integer-signed-big, _/binary>>} =
+                    thrift_transport:read(Wrapped, 4),
+                %% then read the data
+                {ok, Bin} =
+                    thrift_transport:read(Wrapped, FrameLen),
+                {Bin, size(Bin)};
+            Sz ->
+                {RBuf, Sz}
+        end,
+
+    %% pull off Give bytes, return them to the user, leave the rest in the buffer
+    Give = min(RBuf1Size, Len),
+    <<Data:Give/binary, RBuf2/binary>> = iolist_to_binary(RBuf1),
+
+    Response = {ok, Data},
+    State1 = State#framed_transport{read_buffer=RBuf2},
+
+    {reply, Response, State1};
+
+handle_call(flush, _From, State) ->
+    {Response, State1} = do_flush(State),
+    {reply, Response, State1}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(close, State) ->
+    {_, State1} = do_flush(State),
+    %% Wrapped is closed by terminate/2
+    %%  error_logger:info_msg("thrift_framed_transport ~p: closing", [self()]),
+    {stop, normal, State};
+handle_cast(Msg, State=#framed_transport{}) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%%                                       {noreply, State, Timeout} |
+%%                                       {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, State = #framed_transport{wrapped=Wrapped}) ->
+    thrift_transport:close(Wrapped),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+do_flush(State = #framed_transport{write_buffer = Buffer,
+                                   wrapped = Wrapped}) ->
+    FrameLen = iolist_size(Buffer),
+    Data     = [<<FrameLen:32/integer-signed-big>>, Buffer],
+
+    Response = thrift_transport:write(Wrapped, Data),
+
+    thrift_transport:flush(Wrapped),
+
+    State1 = State#framed_transport{write_buffer = []},
+    {Response, State1}.
+
+min(A,B) when A<B -> A;
+min(_,B)          -> B.
+
diff --git a/lib/alterl/src/thrift_socket_transport.erl b/lib/alterl/src/thrift_socket_transport.erl
index 70367d6..0b463cc 100644
--- a/lib/alterl/src/thrift_socket_transport.erl
+++ b/lib/alterl/src/thrift_socket_transport.erl
@@ -23,8 +23,8 @@
         end,
     thrift_transport:new(?MODULE, State).
 
-write(#data{socket = Socket}, Data)
-  when is_binary(Data) ->
+%% Data :: iolist()
+write(#data{socket = Socket}, Data) ->
     gen_tcp:send(Socket, Data).
 
 read(#data{socket=Socket, recv_timeout=Timeout}, Len)
diff --git a/lib/alterl/src/thrift_transport.erl b/lib/alterl/src/thrift_transport.erl
index 919927d..4bbb0a2 100644
--- a/lib/alterl/src/thrift_transport.erl
+++ b/lib/alterl/src/thrift_transport.erl
@@ -22,7 +22,8 @@
     {ok, #transport{module = Module,
                     data = Data}}.
 
-write(Transport, Data) when is_binary(Data) ->
+%% Data :: iolist()
+write(Transport, Data) ->
     Module = Transport#transport.module,
     Module:write(Transport#transport.data, Data).