| %%%------------------------------------------------------------------- |
| %%% File : thrift_server.erl |
| %%% Author : <todd@lipcon.org> |
| %%% Description : |
| %%% |
| %%% Created : 28 Jan 2008 by <todd@lipcon.org> |
| %%%------------------------------------------------------------------- |
| -module(thrift_server). |
| |
| -behaviour(gen_server). |
| |
| %% API |
| -export([start_link/3, stop/1, take_socket/2]). |
| |
| %% gen_server callbacks |
| -export([init/1, handle_call/3, handle_cast/2, handle_info/2, |
| terminate/2, code_change/3]). |
| |
| -define(SERVER, ?MODULE). |
| |
| -record(state, {listen_socket, acceptor_ref, service, handler}). |
| |
| %%==================================================================== |
| %% API |
| %%==================================================================== |
| %%-------------------------------------------------------------------- |
| %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} |
| %% Description: Starts the server |
| %%-------------------------------------------------------------------- |
| start_link(Port, Service, HandlerModule) when is_integer(Port), is_atom(HandlerModule) -> |
| gen_server:start_link({local, ?SERVER}, ?MODULE, {Port, Service, HandlerModule}, []). |
| |
| |
| %%-------------------------------------------------------------------- |
| %% Function: stop(Pid) -> ok, {error, Reason} |
| %% Description: Stops the server. |
| %%-------------------------------------------------------------------- |
| stop(Pid) when is_pid(Pid) -> |
| gen_server:call(Pid, stop). |
| |
| |
| take_socket(Server, Socket) -> |
| gen_server:call(Server, {take_socket, Socket}). |
| |
| |
| %%==================================================================== |
| %% gen_server callbacks |
| %%==================================================================== |
| |
| %%-------------------------------------------------------------------- |
| %% Function: init(Args) -> {ok, State} | |
| %% {ok, State, Timeout} | |
| %% ignore | |
| %% {stop, Reason} |
| %% Description: Initiates the server |
| %%-------------------------------------------------------------------- |
| init({Port, Service, Handler}) -> |
| {ok, Socket} = gen_tcp:listen(Port, |
| [binary, |
| {packet, 0}, |
| {active, false}, |
| {nodelay, true}, |
| {reuseaddr, true}]), |
| {ok, Ref} = prim_inet:async_accept(Socket, -1), |
| {ok, #state{listen_socket = Socket, |
| acceptor_ref = Ref, |
| service = Service, |
| handler = Handler}}. |
| |
| %%-------------------------------------------------------------------- |
| %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | |
| %% {reply, Reply, State, Timeout} | |
| %% {noreply, State} | |
| %% {noreply, State, Timeout} | |
| %% {stop, Reason, Reply, State} | |
| %% {stop, Reason, State} |
| %% Description: Handling call messages |
| %%-------------------------------------------------------------------- |
| handle_call(stop, _From, State) -> |
| {stop, stopped, ok, State}; |
| |
| handle_call({take_socket, Socket}, {FromPid, _Tag}, State) -> |
| Result = gen_tcp:controlling_process(Socket, FromPid), |
| {reply, Result, State}. |
| |
| %%-------------------------------------------------------------------- |
| %% Function: handle_cast(Msg, State) -> {noreply, State} | |
| %% {noreply, State, Timeout} | |
| %% {stop, Reason, State} |
| %% Description: Handling cast messages |
| %%-------------------------------------------------------------------- |
| handle_cast(_Msg, State) -> |
| {noreply, State}. |
| |
| %%-------------------------------------------------------------------- |
| %% Function: handle_info(Info, State) -> {noreply, State} | |
| %% {noreply, State, Timeout} | |
| %% {stop, Reason, State} |
| %% Description: Handling all non call/cast messages |
| %%-------------------------------------------------------------------- |
| handle_info({inet_async, ListenSocket, Ref, {ok, ClientSocket}}, |
| State = #state{listen_socket = ListenSocket, |
| acceptor_ref = Ref, |
| service = Service, |
| handler = Handler}) -> |
| case set_sockopt(ListenSocket, ClientSocket) of |
| ok -> |
| %% New client connected - start processor |
| start_processor(ClientSocket, Service, Handler), |
| {ok, NewRef} = prim_inet:async_accept(ListenSocket, -1), |
| {noreply, State#state{acceptor_ref = NewRef}}; |
| {error, Reason} -> |
| error_logger:error_msg("Couldn't set socket opts: ~p~n", |
| [Reason]), |
| {stop, Reason, State} |
| end; |
| |
| handle_info({inet_async, ListenSocket, Ref, Error}, State) -> |
| error_logger:error_msg("Error in acceptor: ~p~n", [Error]), |
| {stop, Error, State}; |
| |
| handle_info(_Info, State) -> |
| {noreply, State}. |
| |
| %%-------------------------------------------------------------------- |
| %% Function: terminate(Reason, State) -> void() |
| %% Description: This function is called by a gen_server when it is about to |
| %% terminate. It should be the opposite of Module:init/1 and do any necessary |
| %% cleaning up. When it returns, the gen_server terminates with Reason. |
| %% The return value is ignored. |
| %%-------------------------------------------------------------------- |
| terminate(_Reason, _State) -> |
| ok. |
| |
| %%-------------------------------------------------------------------- |
| %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} |
| %% Description: Convert process state when code is changed |
| %%-------------------------------------------------------------------- |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| %%-------------------------------------------------------------------- |
| %%% Internal functions |
| %%-------------------------------------------------------------------- |
| set_sockopt(ListenSocket, ClientSocket) -> |
| true = inet_db:register_socket(ClientSocket, inet_tcp), |
| case prim_inet:getopts(ListenSocket, |
| [active, nodelay, keepalive, delay_send, priority, tos]) of |
| {ok, Opts} -> |
| case prim_inet:setopts(ClientSocket, Opts) of |
| ok -> ok; |
| Error -> gen_tcp:close(ClientSocket), |
| Error |
| end; |
| Error -> |
| gen_tcp:close(ClientSocket), |
| Error |
| end. |
| |
| |
| |
| start_processor(Socket, Service, Handler) -> |
| Server = self(), |
| |
| ProtoGen = fun() -> |
| % Become the controlling process |
| ok = take_socket(Server, Socket), |
| {ok, SocketTransport} = thrift_socket_transport:new(Socket), |
| {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport), |
| {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport), |
| {ok, Protocol, Protocol} |
| end, |
| |
| thrift_processor:start(ProtoGen, Service, Handler). |