Modify thrift_client to take in a "connector" function as a parameter, enabling substitution of different protocol/transports
Summary:
Left in a backwards-compatible start_link with (Host, Port, Service) args
Test plan:
tutorial/alterl still works
Notes:
We may want to go a little further and get rid of the binary_protocol specific stuff from socket_transport as well
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666460 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/alterl/src/thrift_client.erl b/lib/alterl/src/thrift_client.erl
index dee91fb..a2cc56b 100644
--- a/lib/alterl/src/thrift_client.erl
+++ b/lib/alterl/src/thrift_client.erl
@@ -10,7 +10,7 @@
-behaviour(gen_server).
%% API
--export([start_link/3, start_link/4, call/3, close/1]).
+-export([start_link/2, start_link/3, start_link/4, call/3, close/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -20,13 +20,7 @@
-include("thrift_constants.hrl").
-include("thrift_protocol.hrl").
--record(state, {service, protocol, seqid,
- strict_read = true,
- strict_write = true,
- framed = false,
- connect_timeout = infinity,
- sockopts = []
- }).
+-record(state, {service, protocol, seqid}).
%%====================================================================
%% API
@@ -35,14 +29,21 @@
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
-start_link(Host, Port, Service) ->
+start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) ->
start_link(Host, Port, Service, []).
+%% Backwards-compatible starter for the usual case of socket transports
start_link(Host, Port, Service, Options)
when is_integer(Port), is_atom(Service), is_list(Options) ->
- case gen_server:start_link(?MODULE, [Options], []) of
+ {ok, Connector} = thrift_socket_transport:new_connector(Host, Port, Options),
+ start_link(Connector, Service).
+
+%% Connector :: fun() -> thrift_protocol()
+start_link(Connector, Service)
+ when is_function(Connector), is_atom(Service) ->
+ case gen_server:start_link(?MODULE, [Service], []) of
{ok, Pid} ->
- case gen_server:call(Pid, {connect, Host, Port, Service}) of
+ case gen_server:call(Pid, {connect, Connector}) of
ok ->
{ok, Pid};
Error ->
@@ -74,22 +75,8 @@
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
-init([Options]) ->
- State = parse_options(Options, #state{}),
- {ok, State}.
-
-parse_options([], State) ->
- State;
-parse_options([{strict_read, Bool} | Rest], State) when is_boolean(Bool) ->
- parse_options(Rest, State#state{strict_read=Bool});
-parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) ->
- parse_options(Rest, State#state{strict_write=Bool});
-parse_options([{framed, Bool} | Rest], State) when is_boolean(Bool) ->
- parse_options(Rest, State#state{framed=Bool});
-parse_options([{sockopts, OptList} | Rest], State) when is_list(OptList) ->
- parse_options(Rest, State#state{sockopts=OptList});
-parse_options([{connect_timeout, TO} | Rest], State) when TO =:= infinity; is_integer(TO) ->
- parse_options(Rest, State#state{connect_timeout=TO}).
+init([Service]) ->
+ {ok, #state{service = Service}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -100,30 +87,12 @@
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
-handle_call({connect, Host, Port, Service}, _From,
- State = #state{connect_timeout=Timeout,
- sockopts=SockOpts}) ->
- Options = [binary,
- {packet, 0},
- {active, false},
- {nodelay, true}
- | SockOpts
- ],
- case catch gen_tcp:connect(Host, Port, Options, Timeout) of
- {ok, Sock} ->
- {ok, Transport} = thrift_socket_transport:new(Sock),
- {ok, BufTransport} =
- case State#state.framed of
- true -> thrift_framed_transport:new(Transport);
- false -> thrift_buffered_transport:new(Transport)
- end,
- {ok, Protocol} = thrift_binary_protocol:new(BufTransport,
- [{strict_read, State#state.strict_read},
- {strict_write, State#state.strict_write}]),
-
- {reply, ok, State#state{service = Service,
- protocol = Protocol,
- seqid = 0}};
+handle_call({connect, Connector}, _From,
+ State = #state{service = Service}) ->
+ case Connector() of
+ {ok, Protocol} ->
+ {reply, ok, State#state{protocol = Protocol,
+ seqid = 0}};
Error ->
{stop, normal, Error, State}
end;
diff --git a/lib/alterl/src/thrift_socket_transport.erl b/lib/alterl/src/thrift_socket_transport.erl
index 9cc0af9..bdae28b 100644
--- a/lib/alterl/src/thrift_socket_transport.erl
+++ b/lib/alterl/src/thrift_socket_transport.erl
@@ -4,7 +4,9 @@
-export([new/1,
new/2,
- write/2, read/2, flush/1, close/1]).
+ write/2, read/2, flush/1, close/1,
+
+ new_connector/3]).
-record(data, {socket,
recv_timeout=infinity}).
@@ -43,3 +45,41 @@
close(#data{socket = Socket}) ->
gen_tcp:close(Socket).
+
+
+%%%% CONNECTOR GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%%
+%% Generates a "connector" function - a fun which returns a Protocol instance.
+%% This can be passed to thrift_client:start_link in order to connect to a
+%% server over a socket.
+%%
+new_connector(Host, Port, Options) ->
+ ConnectTimeout = proplists:get_value(connect_timeout, Options, infinity),
+ InSockOpts = proplists:get_value(sockopts, Options, []),
+ Framed = proplists:get_value(framed, Options, false),
+ StrictRead = proplists:get_value(strict_read, Options, true),
+ StrictWrite = proplists:get_value(strict_write, Options, true),
+
+ F = fun() ->
+ SockOpts = [binary,
+ {packet, 0},
+ {active, false},
+ {nodelay, true} |
+ InSockOpts],
+ case catch gen_tcp:connect(Host, Port, SockOpts, ConnectTimeout) of
+ {ok, Sock} ->
+ {ok, Transport} = thrift_socket_transport:new(Sock),
+ {ok, BufTransport} =
+ case Framed of
+ true -> thrift_framed_transport:new(Transport);
+ false -> thrift_buffered_transport:new(Transport)
+ end,
+ thrift_binary_protocol:new(BufTransport,
+ [{strict_read, StrictRead},
+ {strict_write, StrictWrite}]);
+ Error ->
+ Error
+ end
+ end,
+ {ok, F}.