THRIFT-211. erlang: Support unlinked Thrift clients.

- Create a thrift_client:start function that accepts client options.
- Make start_link a wrapper that adds {monitor, link}.
- Add a test to make sure that everything dies or doesn't die as expected.
  (The test has to be run manually.)

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@781634 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/erl/src/thrift_client.erl b/lib/erl/src/thrift_client.erl
index 5ba8aee..d70df34 100644
--- a/lib/erl/src/thrift_client.erl
+++ b/lib/erl/src/thrift_client.erl
@@ -22,7 +22,9 @@
 -behaviour(gen_server).
 
 %% API
--export([start_link/2, start_link/3, start_link/4, call/3, send_call/3, close/1]).
+-export([start_link/2, start_link/3, start_link/4,
+         start/3, start/4,
+         call/3, send_call/3, close/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -39,11 +41,16 @@
 %%====================================================================
 %%--------------------------------------------------------------------
 %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
-%% Description: Starts the server
+%% Description: Starts the server as a linked process.
 %%--------------------------------------------------------------------
 start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) ->
     start_link(Host, Port, Service, []).
 
+start_link(Host, Port, Service, Options) ->
+    start(Host, Port, Service, [{monitor, link} | Options]).
+
+start_link(ProtocolFactory, Service) ->
+    start(ProtocolFactory, Service, [{monitor, link}]).
 
 %%
 %% Splits client options into protocol options and transport options
@@ -51,27 +58,36 @@
 %% split_options([Options...]) -> {ProtocolOptions, TransportOptions}
 %%
 split_options(Options) ->
-    split_options(Options, [], []).
+    split_options(Options, [], [], []).
 
-split_options([], ProtoIn, TransIn) ->
-    {ProtoIn, TransIn};
+split_options([], ClientIn, ProtoIn, TransIn) ->
+    {ClientIn, ProtoIn, TransIn};
 
-split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn)
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
+  when OptKey =:= monitor ->
+    split_options(Rest, [Opt | ClientIn], ProtoIn, TransIn);
+
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
   when OptKey =:= strict_read;
        OptKey =:= strict_write ->
-    split_options(Rest, [Opt | ProtoIn], TransIn);
+    split_options(Rest, ClientIn, [Opt | ProtoIn], TransIn);
 
-split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn)
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
   when OptKey =:= framed;
        OptKey =:= connect_timeout;
        OptKey =:= sockopts ->
-    split_options(Rest, ProtoIn, [Opt | TransIn]).
+    split_options(Rest, ClientIn, ProtoIn, [Opt | TransIn]).
 
 
+%%--------------------------------------------------------------------
+%% Function: start() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server as an unlinked process.
+%%--------------------------------------------------------------------
+
 %% Backwards-compatible starter for the common-case of socket transports
-start_link(Host, Port, Service, Options)
+start(Host, Port, Service, Options)
   when is_integer(Port), is_atom(Service), is_list(Options) ->
-    {ProtoOpts, TransOpts} = split_options(Options),
+    {ClientOpts, ProtoOpts, TransOpts} = split_options(Options),
 
     {ok, TransportFactory} =
         thrift_socket_transport:new_transport_factory(Host, Port, TransOpts),
@@ -79,13 +95,21 @@
     {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory(
                               TransportFactory, ProtoOpts),
 
-    start_link(ProtocolFactory, Service).
+    start(ProtocolFactory, Service, ClientOpts).
 
 
 %% ProtocolFactory :: fun() -> thrift_protocol()
-start_link(ProtocolFactory, Service)
+start(ProtocolFactory, Service, ClientOpts)
   when is_function(ProtocolFactory), is_atom(Service) ->
-    case gen_server:start_link(?MODULE, [Service], []) of
+    Starter =
+        case lists:keysearch(monitor, 1, ClientOpts) of
+            {value, {monitor, link}} ->
+                start_link;
+            _ ->
+                start
+        end,
+
+    case gen_server:Starter(?MODULE, [Service], []) of
         {ok, Pid} ->
             case gen_server:call(Pid, {connect, ProtocolFactory}) of
                 ok ->
diff --git a/test/erl/Makefile b/test/erl/Makefile
index 17e30da..2126037 100644
--- a/test/erl/Makefile
+++ b/test/erl/Makefile
@@ -29,7 +29,7 @@
 ALL_INCLUDEDIR=$(GEN_INCLUDEDIR) $(INCLUDEDIR) ../../lib/erl/include
 INCLUDEFLAGS=$(patsubst %,-I%, ${ALL_INCLUDEDIR})
 
-MODULES = stress_server test_server test_disklog test_membuffer
+MODULES = stress_server test_server test_disklog test_membuffer test_tether
 
 INCLUDES = 
 TARGETS = $(patsubst %,${TARGETDIR}/%.beam,${MODULES})
diff --git a/test/erl/src/test_tether.erl b/test/erl/src/test_tether.erl
new file mode 100644
index 0000000..07d1ee9
--- /dev/null
+++ b/test/erl/src/test_tether.erl
@@ -0,0 +1,114 @@
+%% Tests the behavior of clients in the face of transport errors.
+%% Makes sure start, start_linked, and start_tethered work as expected.
+
+-module(test_tether).
+
+-compile(export_all).
+
+t() ->
+    io:format("Starting.~n", []),
+    register(tester, self()),
+
+    Pid1 = erlang:spawn(?MODULE, test_start, []),
+    receive after 200 -> ok end,  % Wait for completion.
+    case is_up(Pid1) of
+        true ->
+            io:format("PASS.  Unlinked owner still alive.~n");
+        false ->
+            io:format("FAIL.  Unlinked owner is dead.~n")
+    end,
+
+    Pid2 = erlang:spawn(?MODULE, test_linked, []),
+    receive after 200 -> ok end,  % Wait for completion.
+    case is_up(Pid2) of
+        true ->
+            io:format("FAIL.  Linked owner still alive.~n");
+        false ->
+            io:format("PASS.  Linked owner is dead.~n")
+    end,
+
+    check_extras(2),
+
+    erlang:halt().
+
+is_up(Pid) ->
+    MonitorRef = erlang:monitor(process, Pid),
+    receive
+        {'DOWN', MonitorRef, process, Pid, _Info} ->
+            false
+    after
+        50 ->
+            erlang:demonitor(MonitorRef),
+            true
+    end.
+
+check_extras(0) -> ok;
+check_extras(N) ->
+    receive
+        {client, Type, Pid} ->
+            case {Type, is_up(Pid)} of
+                {unlinked, true} ->
+                    io:format("PASS.  Unlinked client still alive.~n");
+                {unlinked, false} ->
+                    io:format("FAIL.  Unlinked client dead.~n");
+                {linked, true} ->
+                    io:format("FAIL.  Linked client still alive.~n");
+                {linked, false} ->
+                    io:format("PASS.  Linked client dead.~n")
+            end,
+            check_extras(N-1)
+    after
+        500 ->
+            io:format("FAIL.  Expected ~p more clients.~n", [N])
+    end.
+
+make_protocol_factory(Port) ->
+    {ok, TransportFactory} =
+        thrift_socket_transport:new_transport_factory(
+          "127.0.0.1", Port, []),
+    {ok, ProtocolFactory} =
+        thrift_binary_protocol:new_protocol_factory(
+          TransportFactory, []),
+    ProtocolFactory.
+
+
+test_start() ->
+    {ok, Client1} = gen_server:start(thrift_client, [thriftTest_thrift], []),
+    tester ! {client, unlinked, Client1},
+    {ok, Client2} = gen_server:start(thrift_client, [thriftTest_thrift], []),
+    io:format("PASS.  Unlinked clients created.~n"),
+    try
+        gen_server:call(Client2, {connect, make_protocol_factory(2)}),
+        io:format("FAIL.  Unlinked client connected.~n", [])
+    catch
+        Kind:Info ->
+            io:format("PASS.  Caught unlinked error.  ~p:~p~n", [Kind, Info])
+    end,
+    receive after 100 ->
+                    io:format("PASS.  Still alive after unlinked death.~n"),
+                    %% Hang around a little longer so our parent can verify.
+                    receive after 200 -> ok end
+    end,
+    %% Exit abnormally to not kill our unlinked extra client.
+    exit(die).
+
+test_linked() ->
+    {ok, Client1} = gen_server:start_link(thrift_client, [thriftTest_thrift], []),
+    tester ! {client, linked, Client1},
+    {ok, Client2} = gen_server:start_link(thrift_client, [thriftTest_thrift], []),
+    io:format("PASS.  Linked clients created.~n"),
+    try
+        gen_server:call(Client2, {connect, make_protocol_factory(2)}),
+        io:format("FAIL.  Linked client connected.~n", [])
+    catch
+        Kind:Info ->
+            io:format("FAIL.  Caught linked error.  ~p:~p~n", [Kind, Info])
+    end,
+    receive after 100 ->
+                    io:format("FAIL.  Still alive after linked death.~n"),
+                    % Hang around a little longer so our parent can verify.
+                    receive after 200 -> ok end
+    end,
+    %% Exit abnormally to kill our linked extra client.
+    %% But we should never get here.
+    exit(die).