| David Reiss | ea2cba8 | 2009-03-30 21:35:00 +0000 | [diff] [blame] | 1 | %% | 
|  | 2 | %% Licensed to the Apache Software Foundation (ASF) under one | 
|  | 3 | %% or more contributor license agreements. See the NOTICE file | 
|  | 4 | %% distributed with this work for additional information | 
|  | 5 | %% regarding copyright ownership. The ASF licenses this file | 
|  | 6 | %% to you under the Apache License, Version 2.0 (the | 
|  | 7 | %% "License"); you may not use this file except in compliance | 
|  | 8 | %% with the License. You may obtain a copy of the License at | 
|  | 9 | %% | 
|  | 10 | %%   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | 11 | %% | 
|  | 12 | %% Unless required by applicable law or agreed to in writing, | 
|  | 13 | %% software distributed under the License is distributed on an | 
|  | 14 | %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | 15 | %% KIND, either express or implied. See the License for the | 
|  | 16 | %% specific language governing permissions and limitations | 
|  | 17 | %% under the License. | 
|  | 18 | %% | 
|  | 19 |  | 
| David Reiss | 0c8cb4a | 2008-06-11 01:12:52 +0000 | [diff] [blame] | 20 | -module(thrift_server). | 
|  | 21 |  | 
|  | 22 | -behaviour(gen_server). | 
|  | 23 |  | 
|  | 24 | %% API | 
|  | 25 | -export([start_link/3, stop/1, take_socket/2]). | 
|  | 26 |  | 
|  | 27 | %% gen_server callbacks | 
|  | 28 | -export([init/1, handle_call/3, handle_cast/2, handle_info/2, | 
|  | 29 | terminate/2, code_change/3]). | 
|  | 30 |  | 
|  | 31 | -define(SERVER, ?MODULE). | 
|  | 32 |  | 
|  | 33 | -record(state, {listen_socket, acceptor_ref, service, handler}). | 
|  | 34 |  | 
|  | 35 | %%==================================================================== | 
|  | 36 | %% API | 
|  | 37 | %%==================================================================== | 
|  | 38 | %%-------------------------------------------------------------------- | 
|  | 39 | %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} | 
|  | 40 | %% Description: Starts the server | 
|  | 41 | %%-------------------------------------------------------------------- | 
|  | 42 | start_link(Port, Service, HandlerModule) when is_integer(Port), is_atom(HandlerModule) -> | 
|  | 43 | gen_server:start_link({local, ?SERVER}, ?MODULE, {Port, Service, HandlerModule}, []). | 
|  | 44 |  | 
|  | 45 | %%-------------------------------------------------------------------- | 
|  | 46 | %% Function: stop(Pid) -> ok, {error, Reason} | 
|  | 47 | %% Description: Stops the server. | 
|  | 48 | %%-------------------------------------------------------------------- | 
|  | 49 | stop(Pid) when is_pid(Pid) -> | 
|  | 50 | gen_server:call(Pid, stop). | 
|  | 51 |  | 
|  | 52 |  | 
|  | 53 | take_socket(Server, Socket) -> | 
|  | 54 | gen_server:call(Server, {take_socket, Socket}). | 
|  | 55 |  | 
|  | 56 |  | 
|  | 57 | %%==================================================================== | 
|  | 58 | %% gen_server callbacks | 
|  | 59 | %%==================================================================== | 
|  | 60 |  | 
|  | 61 | %%-------------------------------------------------------------------- | 
|  | 62 | %% Function: init(Args) -> {ok, State} | | 
|  | 63 | %%                         {ok, State, Timeout} | | 
|  | 64 | %%                         ignore               | | 
|  | 65 | %%                         {stop, Reason} | 
|  | 66 | %% Description: Initiates the server | 
|  | 67 | %%-------------------------------------------------------------------- | 
|  | 68 | init({Port, Service, Handler}) -> | 
|  | 69 | {ok, Socket} = gen_tcp:listen(Port, | 
|  | 70 | [binary, | 
|  | 71 | {packet, 0}, | 
|  | 72 | {active, false}, | 
|  | 73 | {nodelay, true}, | 
|  | 74 | {reuseaddr, true}]), | 
|  | 75 | {ok, Ref} = prim_inet:async_accept(Socket, -1), | 
|  | 76 | {ok, #state{listen_socket = Socket, | 
|  | 77 | acceptor_ref = Ref, | 
|  | 78 | service = Service, | 
|  | 79 | handler = Handler}}. | 
|  | 80 |  | 
|  | 81 | %%-------------------------------------------------------------------- | 
|  | 82 | %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | | 
|  | 83 | %%                                      {reply, Reply, State, Timeout} | | 
|  | 84 | %%                                      {noreply, State} | | 
|  | 85 | %%                                      {noreply, State, Timeout} | | 
|  | 86 | %%                                      {stop, Reason, Reply, State} | | 
|  | 87 | %%                                      {stop, Reason, State} | 
|  | 88 | %% Description: Handling call messages | 
|  | 89 | %%-------------------------------------------------------------------- | 
|  | 90 | handle_call(stop, _From, State) -> | 
|  | 91 | {stop, stopped, ok, State}; | 
|  | 92 |  | 
|  | 93 | handle_call({take_socket, Socket}, {FromPid, _Tag}, State) -> | 
|  | 94 | Result = gen_tcp:controlling_process(Socket, FromPid), | 
|  | 95 | {reply, Result, State}. | 
|  | 96 |  | 
|  | 97 | %%-------------------------------------------------------------------- | 
|  | 98 | %% Function: handle_cast(Msg, State) -> {noreply, State} | | 
|  | 99 | %%                                      {noreply, State, Timeout} | | 
|  | 100 | %%                                      {stop, Reason, State} | 
|  | 101 | %% Description: Handling cast messages | 
|  | 102 | %%-------------------------------------------------------------------- | 
|  | 103 | handle_cast(_Msg, State) -> | 
|  | 104 | {noreply, State}. | 
|  | 105 |  | 
|  | 106 | %%-------------------------------------------------------------------- | 
|  | 107 | %% Function: handle_info(Info, State) -> {noreply, State} | | 
|  | 108 | %%                                       {noreply, State, Timeout} | | 
|  | 109 | %%                                       {stop, Reason, State} | 
|  | 110 | %% Description: Handling all non call/cast messages | 
|  | 111 | %%-------------------------------------------------------------------- | 
|  | 112 | handle_info({inet_async, ListenSocket, Ref, {ok, ClientSocket}}, | 
|  | 113 | State = #state{listen_socket = ListenSocket, | 
|  | 114 | acceptor_ref = Ref, | 
|  | 115 | service = Service, | 
|  | 116 | handler = Handler}) -> | 
|  | 117 | case set_sockopt(ListenSocket, ClientSocket) of | 
|  | 118 | ok -> | 
|  | 119 | %% New client connected - start processor | 
|  | 120 | start_processor(ClientSocket, Service, Handler), | 
|  | 121 | {ok, NewRef} = prim_inet:async_accept(ListenSocket, -1), | 
|  | 122 | {noreply, State#state{acceptor_ref = NewRef}}; | 
|  | 123 | {error, Reason} -> | 
|  | 124 | error_logger:error_msg("Couldn't set socket opts: ~p~n", | 
|  | 125 | [Reason]), | 
|  | 126 | {stop, Reason, State} | 
|  | 127 | end; | 
|  | 128 |  | 
| David Reiss | 5ed313d | 2010-08-30 22:05:57 +0000 | [diff] [blame] | 129 | handle_info({inet_async, _ListenSocket, _Ref, Error}, State) -> | 
| David Reiss | 0c8cb4a | 2008-06-11 01:12:52 +0000 | [diff] [blame] | 130 | error_logger:error_msg("Error in acceptor: ~p~n", [Error]), | 
|  | 131 | {stop, Error, State}; | 
|  | 132 |  | 
|  | 133 | handle_info(_Info, State) -> | 
|  | 134 | {noreply, State}. | 
|  | 135 |  | 
|  | 136 | %%-------------------------------------------------------------------- | 
|  | 137 | %% Function: terminate(Reason, State) -> void() | 
|  | 138 | %% Description: This function is called by a gen_server when it is about to | 
|  | 139 | %% terminate. It should be the opposite of Module:init/1 and do any necessary | 
|  | 140 | %% cleaning up. When it returns, the gen_server terminates with Reason. | 
|  | 141 | %% The return value is ignored. | 
|  | 142 | %%-------------------------------------------------------------------- | 
|  | 143 | terminate(_Reason, _State) -> | 
|  | 144 | ok. | 
|  | 145 |  | 
|  | 146 | %%-------------------------------------------------------------------- | 
|  | 147 | %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} | 
|  | 148 | %% Description: Convert process state when code is changed | 
|  | 149 | %%-------------------------------------------------------------------- | 
|  | 150 | code_change(_OldVsn, State, _Extra) -> | 
|  | 151 | {ok, State}. | 
|  | 152 |  | 
|  | 153 | %%-------------------------------------------------------------------- | 
|  | 154 | %%% Internal functions | 
|  | 155 | %%-------------------------------------------------------------------- | 
|  | 156 | set_sockopt(ListenSocket, ClientSocket) -> | 
|  | 157 | true = inet_db:register_socket(ClientSocket, inet_tcp), | 
|  | 158 | case prim_inet:getopts(ListenSocket, | 
|  | 159 | [active, nodelay, keepalive, delay_send, priority, tos]) of | 
|  | 160 | {ok, Opts} -> | 
|  | 161 | case prim_inet:setopts(ClientSocket, Opts) of | 
|  | 162 | ok    -> ok; | 
|  | 163 | Error -> gen_tcp:close(ClientSocket), | 
|  | 164 | Error | 
|  | 165 | end; | 
|  | 166 | Error -> | 
|  | 167 | gen_tcp:close(ClientSocket), | 
|  | 168 | Error | 
|  | 169 | end. | 
|  | 170 |  | 
|  | 171 | start_processor(Socket, Service, Handler) -> | 
|  | 172 | Server = self(), | 
|  | 173 |  | 
|  | 174 | ProtoGen = fun() -> | 
|  | 175 | % Become the controlling process | 
|  | 176 | ok = take_socket(Server, Socket), | 
|  | 177 | {ok, SocketTransport} = thrift_socket_transport:new(Socket), | 
|  | 178 | {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport), | 
|  | 179 | {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport), | 
| David Reiss | 035979f | 2010-08-30 22:05:38 +0000 | [diff] [blame] | 180 | {ok, Protocol} | 
| David Reiss | 0c8cb4a | 2008-06-11 01:12:52 +0000 | [diff] [blame] | 181 | end, | 
|  | 182 |  | 
| David Reiss | 37dbfef | 2008-06-11 01:16:01 +0000 | [diff] [blame] | 183 | spawn(thrift_processor, init, [{Server, ProtoGen, Service, Handler}]). |