Erlang: add framed_transport and non-strict binary_protocol
- thrift_client now takes as its fourth parameter Options: framed, strict_{read,write}, connect_timeout (P.S. fourth param used to be Timeout)
- binary protocol now takes options: strict_{read,write}
- buffers in framed and buffered transport are now iolists and not reversed lists of binaries
- rename buffer in buffered transport "write_buffer" to match framed transport
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666447 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/alterl/src/thrift_binary_protocol.erl b/lib/alterl/src/thrift_binary_protocol.erl
index f115b2f..da43d14 100644
--- a/lib/alterl/src/thrift_binary_protocol.erl
+++ b/lib/alterl/src/thrift_binary_protocol.erl
@@ -11,22 +11,37 @@
-include("thrift_constants.hrl").
-include("thrift_protocol.hrl").
--export([new/1,
+-export([new/1, new/2,
read/2,
write/2,
flush_transport/1,
close_transport/1
-]).
+ ]).
--record(binary_protocol, {transport}).
-
+-record(binary_protocol, {transport,
+ strict_read=true,
+ strict_write=true
+ }).
-define(VERSION_MASK, 16#FFFF0000).
-define(VERSION_1, 16#80010000).
-
+-define(TYPE_MASK, 16#000000ff).
new(Transport) ->
- thrift_protocol:new(?MODULE, #binary_protocol{transport = Transport}).
+ new(Transport, _Options = []).
+
+new(Transport, Options) ->
+ State = #binary_protocol{transport = Transport},
+ State1 = parse_options(Options, State),
+ thrift_protocol:new(?MODULE, State1).
+
+parse_options([], State) ->
+ State;
+parse_options([{strict_read, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_options(Rest, State#binary_protocol{strict_read=Bool});
+parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_options(Rest, State#binary_protocol{strict_write=Bool}).
+
flush_transport(#binary_protocol{transport = Transport}) ->
thrift_transport:flush(Transport).
@@ -42,9 +57,16 @@
name = Name,
type = Type,
seqid = Seqid}) ->
- write(This, {i32, ?VERSION_1 bor Type}),
- write(This, {string, Name}),
- write(This, {i32, Seqid}),
+ case This#binary_protocol.strict_write of
+ true ->
+ write(This, {i32, ?VERSION_1 bor Type}),
+ write(This, {string, Name}),
+ write(This, {i32, Seqid});
+ false ->
+ write(This, {string, Name}),
+ write(This, {byte, Type}),
+ write(This, {i32, Seqid})
+ end,
ok;
write(This, message_end) -> ok;
@@ -121,20 +143,40 @@
write(This, {i32, size(Bin)}),
write(This, Bin);
-write(This, Binary) when is_binary(Binary) ->
- thrift_transport:write(This#binary_protocol.transport, Binary).
+%% Data :: iolist()
+write(This, Data) ->
+ thrift_transport:write(This#binary_protocol.transport, Data).
%%
read(This, message_begin) ->
case read(This, i32) of
- {ok, Version} when Version band ?VERSION_MASK == ?VERSION_1 ->
- Type = Version band 16#000000ff,
+ {ok, Sz} when Sz band ?VERSION_MASK =:= ?VERSION_1 ->
+ %% we're at version 1
{ok, Name} = read(This, string),
+ Type = Sz band ?TYPE_MASK,
{ok, SeqId} = read(This, i32),
#protocol_message_begin{name = binary_to_list(Name),
type = Type,
seqid = SeqId};
+
+ {ok, Sz} when Sz < 0 ->
+ %% there's a version number but it's unexpected
+ {error, {bad_binary_protocol_version, Sz}};
+
+ {ok, Sz} when This#binary_protocol.strict_read =:= true ->
+ %% strict_read is true and there's no version header; that's an error
+ {error, no_binary_protocol_version};
+
+ {ok, Sz} when This#binary_protocol.strict_read =:= false ->
+ %% strict_read is false, so just read the old way
+ {ok, Name} = read(This, Sz),
+ {ok, Type} = read(This, byte),
+ {ok, SeqId} = read(This, i32),
+ #protocol_message_begin{name = binary_to_list(Name),
+ type = Type,
+ seqid = SeqId};
+
Err = {error, closed} -> Err;
Err = {error, ebadf} -> Err
end;
diff --git a/lib/alterl/src/thrift_buffered_transport.erl b/lib/alterl/src/thrift_buffered_transport.erl
index d79e987..575245d 100644
--- a/lib/alterl/src/thrift_buffered_transport.erl
+++ b/lib/alterl/src/thrift_buffered_transport.erl
@@ -21,11 +21,7 @@
-export([write/2, read/2, flush/1, close/1]).
-record(buffered_transport, {wrapped, % a thrift_transport
- buffer
- %% a list of binaries which will be concatenated and sent during
- %% a flush.
- %%
- %% *** THIS LIST IS STORED IN REVERSE ORDER!!! ***
+ write_buffer % iolist()
}).
%%====================================================================
@@ -46,11 +42,11 @@
%%--------------------------------------------------------------------
%% Function: write(Transport, Data) -> ok
%%
-%% Data = binary()
+%% Data = iolist()
%%
%% Description: Writes data into the buffer
%%--------------------------------------------------------------------
-write(Transport, Data) when is_binary(Data) ->
+write(Transport, Data) ->
gen_server:call(Transport, {write, Data}).
%%--------------------------------------------------------------------
@@ -94,7 +90,7 @@
%% TODO(cpiro): need to trap exits here so when transport exits
%% normally from under our feet we exit normally
{ok, #buffered_transport{wrapped = Wrapped,
- buffer = []}}.
+ write_buffer = []}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -105,19 +101,18 @@
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
-handle_call({write, Data}, _From, State = #buffered_transport{buffer = Buffer}) ->
- {reply, ok, State#buffered_transport{buffer = [Data | Buffer]}};
+handle_call({write, Data}, _From, State = #buffered_transport{write_buffer = WBuf}) ->
+ {reply, ok, State#buffered_transport{write_buffer = [WBuf, Data]}};
handle_call({read, Len}, _From, State = #buffered_transport{wrapped = Wrapped}) ->
Response = thrift_transport:read(Wrapped, Len),
{reply, Response, State};
-handle_call(flush, _From, State = #buffered_transport{buffer = Buffer,
+handle_call(flush, _From, State = #buffered_transport{write_buffer = WBuf,
wrapped = Wrapped}) ->
- Concat = concat_binary(lists:reverse(Buffer)),
- Response = thrift_transport:write(Wrapped, Concat),
+ Response = thrift_transport:write(Wrapped, WBuf),
thrift_transport:flush(Wrapped),
- {reply, Response, State#buffered_transport{buffer = []}}.
+ {reply, Response, State#buffered_transport{write_buffer = []}}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
@@ -125,9 +120,9 @@
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
-handle_cast(close, State = #buffered_transport{buffer = Buffer,
+handle_cast(close, State = #buffered_transport{write_buffer = WBuf,
wrapped = Wrapped}) ->
- thrift_transport:write(Wrapped, concat_binary(lists:reverse(Buffer))),
+ thrift_transport:write(Wrapped, WBuf),
%% Wrapped is closed by terminate/2
%% error_logger:info_msg("thrift_buffered_transport ~p: closing", [self()]),
{stop, normal, State};
diff --git a/lib/alterl/src/thrift_client.erl b/lib/alterl/src/thrift_client.erl
index af7aea9..779595c 100644
--- a/lib/alterl/src/thrift_client.erl
+++ b/lib/alterl/src/thrift_client.erl
@@ -20,7 +20,12 @@
-include("thrift_constants.hrl").
-include("thrift_protocol.hrl").
--record(state, {service, protocol, seqid}).
+-record(state, {service, protocol, seqid,
+ strict_read = true,
+ strict_write = true,
+ framed = false,
+ connect_timeout = infinity
+ }).
%%====================================================================
%% API
@@ -30,10 +35,10 @@
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link(Host, Port, Service) ->
- start_link(Host, Port, Service, _Timeout = infinity).
+ start_link(Host, Port, Service, []).
-start_link(Host, Port, Service, Timeout) when is_integer(Port), is_atom(Service) ->
- gen_server:start_link(?MODULE, [Host, Port, Service, Timeout], []).
+start_link(Host, Port, Service, Options) when is_integer(Port), is_atom(Service), is_list(Options) ->
+ gen_server:start_link(?MODULE, [Host, Port, Service, Options], []).
call(Client, Function, Args)
when is_pid(Client), is_atom(Function), is_list(Args) ->
@@ -57,24 +62,41 @@
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
-init([Host, Port, Service]) ->
- init([Host, Port, Service, infinity]);
+init([Host, Port, Service, Options]) ->
+ State = parse_options(Options, #state{}),
-init([Host, Port, Service, Timeout]) ->
{ok, Sock} = gen_tcp:connect(Host, Port,
[binary,
{packet, 0},
{active, false},
{nodelay, true}
],
- Timeout),
+ State#state.connect_timeout),
- {ok, Transport} = thrift_socket_transport:new(Sock),
- {ok, BufTransport} = thrift_buffered_transport:new(Transport),
- {ok, Protocol} = thrift_binary_protocol:new(BufTransport),
- {ok, #state{service = Service,
- protocol = Protocol,
- seqid = 0}}.
+ {ok, Transport} = thrift_socket_transport:new(Sock),
+ {ok, BufTransport} =
+ case State#state.framed of
+ true -> thrift_framed_transport:new(Transport);
+ false -> thrift_buffered_transport:new(Transport)
+ end,
+ {ok, Protocol} = thrift_binary_protocol:new(BufTransport,
+ [{strict_read, State#state.strict_read},
+ {strict_write, State#state.strict_write}]),
+
+ {ok, State#state{service = Service,
+ protocol = Protocol,
+ seqid = 0}}.
+
+parse_options([], State) ->
+ State;
+parse_options([{strict_read, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_options(Rest, State#state{strict_read=Bool});
+parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_options(Rest, State#state{strict_write=Bool});
+parse_options([{framed, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_options(Rest, State#state{framed=Bool});
+parse_options([{connect_timeout, TO} | Rest], State) when TO =:= infinity; is_integer(TO) ->
+ parse_options(Rest, State#state{connect_timeout=TO}).
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
diff --git a/lib/alterl/src/thrift_framed_transport.erl b/lib/alterl/src/thrift_framed_transport.erl
new file mode 100644
index 0000000..814e0d9
--- /dev/null
+++ b/lib/alterl/src/thrift_framed_transport.erl
@@ -0,0 +1,198 @@
+%%%-------------------------------------------------------------------
+%%% File : thrift_framed_transport.erl
+%%% Author : <cpiro@facebook.com>
+%%% Description : Framed transport for thrift
+%%%
+%%% Created : 12 Mar 2008 by <cpiro@facebook.com>
+%%%-------------------------------------------------------------------
+-module(thrift_framed_transport).
+
+-behaviour(gen_server).
+-behaviour(thrift_transport).
+
+%% API
+-export([new/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% thrift_transport callbacks
+-export([write/2, read/2, flush/1, close/1]).
+
+-record(framed_transport, {wrapped, % a thrift_transport
+ read_buffer, % iolist()
+ write_buffer % iolist()
+ }).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+new(WrappedTransport) ->
+ case gen_server:start_link(?MODULE, [WrappedTransport], []) of
+ {ok, Pid} ->
+ thrift_transport:new(?MODULE, Pid);
+ Else ->
+ Else
+ end.
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+ gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+ gen_server:call(Transport, flush).
+
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport and the wrapped transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+ gen_server:cast(Transport, close).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+ gen_server:call(Transport, {read, Len}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Wrapped]) ->
+ %% TODO(cpiro): need to trap exits here so when transport exits
+ %% normally from under our feet we exit normally
+ {ok, #framed_transport{wrapped = Wrapped,
+ read_buffer = [],
+ write_buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({write, Data}, _From, State = #framed_transport{write_buffer = WBuf}) ->
+ {reply, ok, State#framed_transport{write_buffer = [WBuf, Data]}};
+
+handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped,
+ read_buffer = RBuf}) ->
+ {RBuf1, RBuf1Size} =
+ %% if the read buffer is empty, read another frame
+ %% otherwise, just read from what's left in the buffer
+ case iolist_size(RBuf) of
+ 0 ->
+ %% read the frame length
+ {ok, <<FrameLen:32/integer-signed-big, _/binary>>} =
+ thrift_transport:read(Wrapped, 4),
+ %% then read the data
+ {ok, Bin} =
+ thrift_transport:read(Wrapped, FrameLen),
+ {Bin, size(Bin)};
+ Sz ->
+ {RBuf, Sz}
+ end,
+
+ %% pull off Give bytes, return them to the user, leave the rest in the buffer
+ Give = min(RBuf1Size, Len),
+ <<Data:Give/binary, RBuf2/binary>> = iolist_to_binary(RBuf1),
+
+ Response = {ok, Data},
+ State1 = State#framed_transport{read_buffer=RBuf2},
+
+ {reply, Response, State1};
+
+handle_call(flush, _From, State) ->
+ {Response, State1} = do_flush(State),
+ {reply, Response, State1}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(close, State) ->
+ {_, State1} = do_flush(State),
+ %% Wrapped is closed by terminate/2
+ %% error_logger:info_msg("thrift_framed_transport ~p: closing", [self()]),
+ {stop, normal, State};
+handle_cast(Msg, State=#framed_transport{}) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, State = #framed_transport{wrapped=Wrapped}) ->
+ thrift_transport:close(Wrapped),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+do_flush(State = #framed_transport{write_buffer = Buffer,
+ wrapped = Wrapped}) ->
+ FrameLen = iolist_size(Buffer),
+ Data = [<<FrameLen:32/integer-signed-big>>, Buffer],
+
+ Response = thrift_transport:write(Wrapped, Data),
+
+ thrift_transport:flush(Wrapped),
+
+ State1 = State#framed_transport{write_buffer = []},
+ {Response, State1}.
+
+min(A,B) when A<B -> A;
+min(_,B) -> B.
+
diff --git a/lib/alterl/src/thrift_socket_transport.erl b/lib/alterl/src/thrift_socket_transport.erl
index 70367d6..0b463cc 100644
--- a/lib/alterl/src/thrift_socket_transport.erl
+++ b/lib/alterl/src/thrift_socket_transport.erl
@@ -23,8 +23,8 @@
end,
thrift_transport:new(?MODULE, State).
-write(#data{socket = Socket}, Data)
- when is_binary(Data) ->
+%% Data :: iolist()
+write(#data{socket = Socket}, Data) ->
gen_tcp:send(Socket, Data).
read(#data{socket=Socket, recv_timeout=Timeout}, Len)
diff --git a/lib/alterl/src/thrift_transport.erl b/lib/alterl/src/thrift_transport.erl
index 919927d..4bbb0a2 100644
--- a/lib/alterl/src/thrift_transport.erl
+++ b/lib/alterl/src/thrift_transport.erl
@@ -22,7 +22,8 @@
{ok, #transport{module = Module,
data = Data}}.
-write(Transport, Data) when is_binary(Data) ->
+%% Data :: iolist()
+write(Transport, Data) ->
Module = Transport#transport.module,
Module:write(Transport#transport.data, Data).