THRIFT-211. erlang: Support unlinked Thrift clients.
- Create a thrift_client:start function that accepts client options.
- Make start_link a wrapper that adds {monitor, link}.
- Add a test to make sure that everything dies or doesn't die as expected.
(The test has to be run manually.)
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@781634 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/erl/src/thrift_client.erl b/lib/erl/src/thrift_client.erl
index 5ba8aee..d70df34 100644
--- a/lib/erl/src/thrift_client.erl
+++ b/lib/erl/src/thrift_client.erl
@@ -22,7 +22,9 @@
-behaviour(gen_server).
%% API
--export([start_link/2, start_link/3, start_link/4, call/3, send_call/3, close/1]).
+-export([start_link/2, start_link/3, start_link/4,
+ start/3, start/4,
+ call/3, send_call/3, close/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -39,11 +41,16 @@
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
-%% Description: Starts the server
+%% Description: Starts the server as a linked process.
%%--------------------------------------------------------------------
start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) ->
start_link(Host, Port, Service, []).
+start_link(Host, Port, Service, Options) ->
+ start(Host, Port, Service, [{monitor, link} | Options]).
+
+start_link(ProtocolFactory, Service) ->
+ start(ProtocolFactory, Service, [{monitor, link}]).
%%
%% Splits client options into protocol options and transport options
@@ -51,27 +58,36 @@
%% split_options([Options...]) -> {ProtocolOptions, TransportOptions}
%%
split_options(Options) ->
- split_options(Options, [], []).
+ split_options(Options, [], [], []).
-split_options([], ProtoIn, TransIn) ->
- {ProtoIn, TransIn};
+split_options([], ClientIn, ProtoIn, TransIn) ->
+ {ClientIn, ProtoIn, TransIn};
-split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn)
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
+ when OptKey =:= monitor ->
+ split_options(Rest, [Opt | ClientIn], ProtoIn, TransIn);
+
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
when OptKey =:= strict_read;
OptKey =:= strict_write ->
- split_options(Rest, [Opt | ProtoIn], TransIn);
+ split_options(Rest, ClientIn, [Opt | ProtoIn], TransIn);
-split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn)
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
when OptKey =:= framed;
OptKey =:= connect_timeout;
OptKey =:= sockopts ->
- split_options(Rest, ProtoIn, [Opt | TransIn]).
+ split_options(Rest, ClientIn, ProtoIn, [Opt | TransIn]).
+%%--------------------------------------------------------------------
+%% Function: start() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server as an unlinked process.
+%%--------------------------------------------------------------------
+
%% Backwards-compatible starter for the common-case of socket transports
-start_link(Host, Port, Service, Options)
+start(Host, Port, Service, Options)
when is_integer(Port), is_atom(Service), is_list(Options) ->
- {ProtoOpts, TransOpts} = split_options(Options),
+ {ClientOpts, ProtoOpts, TransOpts} = split_options(Options),
{ok, TransportFactory} =
thrift_socket_transport:new_transport_factory(Host, Port, TransOpts),
@@ -79,13 +95,21 @@
{ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory(
TransportFactory, ProtoOpts),
- start_link(ProtocolFactory, Service).
+ start(ProtocolFactory, Service, ClientOpts).
%% ProtocolFactory :: fun() -> thrift_protocol()
-start_link(ProtocolFactory, Service)
+start(ProtocolFactory, Service, ClientOpts)
when is_function(ProtocolFactory), is_atom(Service) ->
- case gen_server:start_link(?MODULE, [Service], []) of
+ Starter =
+ case lists:keysearch(monitor, 1, ClientOpts) of
+ {value, {monitor, link}} ->
+ start_link;
+ _ ->
+ start
+ end,
+
+ case gen_server:Starter(?MODULE, [Service], []) of
{ok, Pid} ->
case gen_server:call(Pid, {connect, ProtocolFactory}) of
ok ->
diff --git a/test/erl/Makefile b/test/erl/Makefile
index 17e30da..2126037 100644
--- a/test/erl/Makefile
+++ b/test/erl/Makefile
@@ -29,7 +29,7 @@
ALL_INCLUDEDIR=$(GEN_INCLUDEDIR) $(INCLUDEDIR) ../../lib/erl/include
INCLUDEFLAGS=$(patsubst %,-I%, ${ALL_INCLUDEDIR})
-MODULES = stress_server test_server test_disklog test_membuffer
+MODULES = stress_server test_server test_disklog test_membuffer test_tether
INCLUDES =
TARGETS = $(patsubst %,${TARGETDIR}/%.beam,${MODULES})
diff --git a/test/erl/src/test_tether.erl b/test/erl/src/test_tether.erl
new file mode 100644
index 0000000..07d1ee9
--- /dev/null
+++ b/test/erl/src/test_tether.erl
@@ -0,0 +1,114 @@
+%% Tests the behavior of clients in the face of transport errors.
+%% Makes sure start, start_linked, and start_tethered work as expected.
+
+-module(test_tether).
+
+-compile(export_all).
+
+t() ->
+ io:format("Starting.~n", []),
+ register(tester, self()),
+
+ Pid1 = erlang:spawn(?MODULE, test_start, []),
+ receive after 200 -> ok end, % Wait for completion.
+ case is_up(Pid1) of
+ true ->
+ io:format("PASS. Unlinked owner still alive.~n");
+ false ->
+ io:format("FAIL. Unlinked owner is dead.~n")
+ end,
+
+ Pid2 = erlang:spawn(?MODULE, test_linked, []),
+ receive after 200 -> ok end, % Wait for completion.
+ case is_up(Pid2) of
+ true ->
+ io:format("FAIL. Linked owner still alive.~n");
+ false ->
+ io:format("PASS. Linked owner is dead.~n")
+ end,
+
+ check_extras(2),
+
+ erlang:halt().
+
+is_up(Pid) ->
+ MonitorRef = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', MonitorRef, process, Pid, _Info} ->
+ false
+ after
+ 50 ->
+ erlang:demonitor(MonitorRef),
+ true
+ end.
+
+check_extras(0) -> ok;
+check_extras(N) ->
+ receive
+ {client, Type, Pid} ->
+ case {Type, is_up(Pid)} of
+ {unlinked, true} ->
+ io:format("PASS. Unlinked client still alive.~n");
+ {unlinked, false} ->
+ io:format("FAIL. Unlinked client dead.~n");
+ {linked, true} ->
+ io:format("FAIL. Linked client still alive.~n");
+ {linked, false} ->
+ io:format("PASS. Linked client dead.~n")
+ end,
+ check_extras(N-1)
+ after
+ 500 ->
+ io:format("FAIL. Expected ~p more clients.~n", [N])
+ end.
+
+make_protocol_factory(Port) ->
+ {ok, TransportFactory} =
+ thrift_socket_transport:new_transport_factory(
+ "127.0.0.1", Port, []),
+ {ok, ProtocolFactory} =
+ thrift_binary_protocol:new_protocol_factory(
+ TransportFactory, []),
+ ProtocolFactory.
+
+
+test_start() ->
+ {ok, Client1} = gen_server:start(thrift_client, [thriftTest_thrift], []),
+ tester ! {client, unlinked, Client1},
+ {ok, Client2} = gen_server:start(thrift_client, [thriftTest_thrift], []),
+ io:format("PASS. Unlinked clients created.~n"),
+ try
+ gen_server:call(Client2, {connect, make_protocol_factory(2)}),
+ io:format("FAIL. Unlinked client connected.~n", [])
+ catch
+ Kind:Info ->
+ io:format("PASS. Caught unlinked error. ~p:~p~n", [Kind, Info])
+ end,
+ receive after 100 ->
+ io:format("PASS. Still alive after unlinked death.~n"),
+ %% Hang around a little longer so our parent can verify.
+ receive after 200 -> ok end
+ end,
+ %% Exit abnormally to not kill our unlinked extra client.
+ exit(die).
+
+test_linked() ->
+ {ok, Client1} = gen_server:start_link(thrift_client, [thriftTest_thrift], []),
+ tester ! {client, linked, Client1},
+ {ok, Client2} = gen_server:start_link(thrift_client, [thriftTest_thrift], []),
+ io:format("PASS. Linked clients created.~n"),
+ try
+ gen_server:call(Client2, {connect, make_protocol_factory(2)}),
+ io:format("FAIL. Linked client connected.~n", [])
+ catch
+ Kind:Info ->
+ io:format("FAIL. Caught linked error. ~p:~p~n", [Kind, Info])
+ end,
+ receive after 100 ->
+ io:format("FAIL. Still alive after linked death.~n"),
+ % Hang around a little longer so our parent can verify.
+ receive after 200 -> ok end
+ end,
+ %% Exit abnormally to kill our linked extra client.
+ %% But we should never get here.
+ exit(die).