Change alterl thrift_server to use non-blocking TCP calls and properly set the processor as the controlling process for the client sockets.

Summary:
  - Removes the non-OTP "acceptor" process
  - The processor becomes the socket's controlling process instead of the transport, which is kind of messy, but it means we don't have to make a process for the socket_transport.
  - See http://www.trapexit.org/Building_a_Non-blocking_TCP_server_using_OTP_principles for non-blocking server info

Test plan:
  - Ran ThriftTest and StressTest


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666417 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/alterl/src/thrift_server.erl b/lib/alterl/src/thrift_server.erl
index 54c7085..8a1704f 100644
--- a/lib/alterl/src/thrift_server.erl
+++ b/lib/alterl/src/thrift_server.erl
@@ -10,7 +10,7 @@
 -behaviour(gen_server).
 
 %% API
--export([start_link/3, stop/1]).
+-export([start_link/3, stop/1, take_socket/2]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -18,7 +18,7 @@
 
 -define(SERVER, ?MODULE).
 
--record(state, {listen_socket, acceptor, service}).
+-record(state, {listen_socket, acceptor_ref, service, handler}).
 
 %%====================================================================
 %% API
@@ -39,6 +39,10 @@
     gen_server:call(Pid, stop).
 
 
+take_socket(Server, Socket) ->
+    gen_server:call(Server, {take_socket, Socket}).
+
+
 %%====================================================================
 %% gen_server callbacks
 %%====================================================================
@@ -57,10 +61,11 @@
                                    {active, false},
                                    {nodelay, true},
                                    {reuseaddr, true}]),
-    Acceptor = spawn_link(fun () -> acceptor(Socket, Service, Handler) end),
+    {ok, Ref} = prim_inet:async_accept(Socket, -1),
     {ok, #state{listen_socket = Socket,
-                acceptor = Acceptor,
-                service = Service}}.
+                acceptor_ref = Ref,
+                service = Service,
+                handler = Handler}}.
 
 %%--------------------------------------------------------------------
 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -72,8 +77,11 @@
 %% Description: Handling call messages
 %%--------------------------------------------------------------------
 handle_call(stop, _From, State) ->
-    State#state.acceptor ! stop,
-    {stop, stopped, ok, State}.
+    {stop, stopped, ok, State};
+
+handle_call({take_socket, Socket}, {FromPid, _Tag}, State) ->
+    Result = gen_tcp:controlling_process(Socket, FromPid),
+    {reply, Result, State}.
 
 %%--------------------------------------------------------------------
 %% Function: handle_cast(Msg, State) -> {noreply, State} |
@@ -90,6 +98,27 @@
 %%                                       {stop, Reason, State}
 %% Description: Handling all non call/cast messages
 %%--------------------------------------------------------------------
+handle_info({inet_async, ListenSocket, Ref, {ok, ClientSocket}},
+            State = #state{listen_socket = ListenSocket,
+                           acceptor_ref = Ref,
+                           service = Service,
+                           handler = Handler}) ->
+    case set_sockopt(ListenSocket, ClientSocket) of
+        ok ->
+            %% New client connected - start processor
+            start_processor(ClientSocket, Service, Handler),
+            {ok, NewRef} = prim_inet:async_accept(ListenSocket, -1),
+            {noreply, State#state{acceptor_ref = NewRef}};
+        {error, Reason} ->
+            error_logger:error_msg("Couldn't set socket opts: ~p~n",
+                                   [Reason]),
+            {stop, Reason, State}
+    end;
+
+handle_info({inet_async, ListenSocket, Ref, Error}, State) ->
+    error_logger:error_msg("Error in acceptor: ~p~n", [Error]),
+    {stop, Error, State};
+
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -108,31 +137,38 @@
 %% Description: Convert process state when code is changed
 %%--------------------------------------------------------------------
 code_change(_OldVsn, State, _Extra) ->
-    State#state.acceptor ! refresh,
     {ok, State}.
 
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
+set_sockopt(ListenSocket, ClientSocket) ->
+    true = inet_db:register_socket(ClientSocket, inet_tcp),
+    case prim_inet:getopts(ListenSocket,
+                           [active, nodelay, keepalive, delay_send, priority, tos]) of
+        {ok, Opts} ->
+            case prim_inet:setopts(ClientSocket, Opts) of
+                ok    -> ok;
+                Error -> gen_tcp:close(ClientSocket),
+                         Error
+            end;
+        Error ->
+            gen_tcp:close(ClientSocket),
+            Error
+    end.
 
-acceptor(ListenSocket, Service, Handler)
-  when is_port(ListenSocket), is_atom(Handler) ->
-    {ok, Socket} = gen_tcp:accept(ListenSocket),
-%    error_logger:info_msg("Accepted client"),
+
+
+start_processor(Socket, Service, Handler) ->
+    Server = self(),
 
     ProtoGen = fun() ->
+                       % Become the controlling process
+                       ok = take_socket(Server, Socket),
                        {ok, SocketTransport} = thrift_socket_transport:new(Socket),
                        {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport),
                        {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport),
                        {ok, Protocol, Protocol}
                end,
-
-    thrift_processor:start(ProtoGen, Service, Handler),
-    receive
-        refresh ->
-            error_logger:info_msg("Acceptor refreshing~n"),
-            ?MODULE:acceptor(ListenSocket, Service, Handler);
-        stop ->
-            ok
-    after 0      -> acceptor(ListenSocket, Service, Handler)
-    end.
+    
+    thrift_processor:start(ProtoGen, Service, Handler).
diff --git a/test/erl/Makefile b/test/erl/Makefile
index 017cdc1..42572d2 100644
--- a/test/erl/Makefile
+++ b/test/erl/Makefile
@@ -7,7 +7,7 @@
 TARGETDIR=ebin
 SRCDIR=src
 
-ALL_INCLUDEDIR=$(GEN_INCLUDEDIR) $(INCLUDEDIR) ../../lib/erl/include
+ALL_INCLUDEDIR=$(GEN_INCLUDEDIR) $(INCLUDEDIR) ../../lib/alterl/include
 INCLUDEFLAGS=$(patsubst %,-I%, ${ALL_INCLUDEDIR})
 
 MODULES = stress_server test_server
diff --git a/test/erl/src/stress_server.erl b/test/erl/src/stress_server.erl
index 915b027..d82f940 100644
--- a/test/erl/src/stress_server.erl
+++ b/test/erl/src/stress_server.erl
@@ -1,8 +1,7 @@
 -module(stress_server).
 
--include("thrift.hrl").
 
--export([start_link/1, old_start_link/1,
+-export([start_link/1,
 
          handle_function/2,
 
@@ -19,24 +18,6 @@
 start_link(Port) ->
     thrift_server:start_link(Port, service_thrift, ?MODULE).
 
-% Start the server with the old style bindings
-old_start_link(Port) ->
-    Handler   = ?MODULE,
-    Processor = service_thrift,
-
-    TF = tBufferedTransportFactory:new(),
-    PF = tBinaryProtocolFactory:new(),
-
-    ServerTransport = tErlAcceptor,
-    ServerFlavor    = tErlServer,
-
-    Server = oop:start_new(ServerFlavor, [Port, Handler, Processor, ServerTransport, TF, PF]),
-
-    case ?R0(Server, effectful_serve) of
-	ok    -> Server;
-	Error -> Error
-    end.
-    
 
 handle_function(Function, Args) ->
     case apply(?MODULE, Function, tuple_to_list(Args)) of