blob: bf7677811e289cba1fc14979baf7ffe066405973 [file] [log] [blame]
David Reiss80862312008-06-11 00:59:55 +00001%%%-------------------------------------------------------------------
2%%% File : thrift_socket_server.erl
3%%% Author : eugene letuchy <eletuchy@facebook.com>
4%%% Description : A rewrite of thrift_server, based quite heavily
5%%% on the mochiweb_socket_server module of mochiweb
6%%% Created : 3 Mar 2008 by eugene letuchy <eletuchy@facebook.com>
7%%%-------------------------------------------------------------------
8-module(thrift_socket_server).
9
10-author('eletuchy@facebook.com').
11-author('todd@lipcon.org').
12
13-behaviour(gen_server).
14
15-export([start/1, stop/1]).
16
17-export([init/1, handle_call/3, handle_cast/2, terminate/2, code_change/3,
18 handle_info/2]).
19
20-export([acceptor_loop/1]).
21
22-record(thrift_socket_server,
23 {port,
24 service,
25 handler,
26 name,
27 max=2048,
28 ip=any,
29 listen=null,
David Reissb7c88022008-06-11 01:00:20 +000030 acceptor=null,
31 socket_opts=[{recv_timeout, 500}]
32 }).
David Reiss80862312008-06-11 00:59:55 +000033
34start(State=#thrift_socket_server{}) ->
35 io:format("~p~n", [State]),
36 start_server(State);
37start(Options) ->
38 start(parse_options(Options)).
39
40stop(Name) when is_atom(Name) ->
41 gen_server:cast(Name, stop);
42stop(Pid) when is_pid(Pid) ->
43 gen_server:cast(Pid, stop);
44stop({local, Name}) ->
45 stop(Name);
46stop({global, Name}) ->
47 stop(Name);
48stop(Options) ->
49 State = parse_options(Options),
50 stop(State#thrift_socket_server.name).
51
52%% Internal API
53
54parse_options(Options) ->
55 parse_options(Options, #thrift_socket_server{}).
56
57parse_options([], State) ->
58 State;
59parse_options([{name, L} | Rest], State) when is_list(L) ->
60 Name = {local, list_to_atom(L)},
61 parse_options(Rest, State#thrift_socket_server{name=Name});
62parse_options([{name, A} | Rest], State) when is_atom(A) ->
63 Name = {local, A},
64 parse_options(Rest, State#thrift_socket_server{name=Name});
65parse_options([{name, Name} | Rest], State) ->
66 parse_options(Rest, State#thrift_socket_server{name=Name});
67parse_options([{port, L} | Rest], State) when is_list(L) ->
68 Port = list_to_integer(L),
69 parse_options(Rest, State#thrift_socket_server{port=Port});
70parse_options([{port, Port} | Rest], State) ->
71 parse_options(Rest, State#thrift_socket_server{port=Port});
72parse_options([{ip, Ip} | Rest], State) ->
73 ParsedIp = case Ip of
74 any ->
75 any;
76 Ip when is_tuple(Ip) ->
77 Ip;
78 Ip when is_list(Ip) ->
79 {ok, IpTuple} = inet_parse:address(Ip),
80 IpTuple
81 end,
82 parse_options(Rest, State#thrift_socket_server{ip=ParsedIp});
David Reissb7c88022008-06-11 01:00:20 +000083parse_options([{socket_opts, L} | Rest], State) when is_list(L), length(L) > 0 ->
84 parse_options(Rest, State#thrift_socket_server{socket_opts=L});
David Reiss80862312008-06-11 00:59:55 +000085parse_options([{handler, Handler} | Rest], State) ->
86 parse_options(Rest, State#thrift_socket_server{handler=Handler});
87parse_options([{service, Service} | Rest], State) ->
88 parse_options(Rest, State#thrift_socket_server{service=Service});
89parse_options([{max, Max} | Rest], State) ->
90 MaxInt = case Max of
91 Max when is_list(Max) ->
92 list_to_integer(Max);
93 Max when is_integer(Max) ->
94 Max
95 end,
96 parse_options(Rest, State#thrift_socket_server{max=MaxInt}).
97
98start_server(State=#thrift_socket_server{name=Name}) ->
99 io:format("starting~n"),
100 case Name of
101 undefined ->
102 gen_server:start_link(?MODULE, State, []);
103 _ ->
104 gen_server:start_link(Name, ?MODULE, State, [])
105 end.
106
107init(State=#thrift_socket_server{ip=Ip, port=Port}) ->
108 process_flag(trap_exit, true), %% only temporary
109 BaseOpts = [binary,
110 {reuseaddr, true},
111 {packet, 0},
112 {backlog, 30},
113 {recbuf, 8192},
114 {active, false}],
115 Opts = case Ip of
116 any ->
117 BaseOpts;
118 Ip ->
119 [{ip, Ip} | BaseOpts]
120 end,
121 case gen_tcp_listen(Port, Opts, State) of
122 {stop, eacces} ->
123 %% fdsrv module allows another shot to bind
124 %% ports which require root access
125 case Port < 1024 of
126 true ->
127 case fdsrv:start() of
128 {ok, _} ->
129 case fdsrv:bind_socket(tcp, Port) of
130 {ok, Fd} ->
131 gen_tcp_listen(Port, [{fd, Fd} | Opts], State);
132 _ ->
133 {stop, fdsrv_bind_failed}
134 end;
135 _ ->
136 {stop, fdsrv_start_failed}
137 end;
138 false ->
139 {stop, eacces}
140 end;
141 Other ->
142 error_logger:info_msg("thrift service listening on port ~p", [Port]),
143 Other
144 end.
145
146gen_tcp_listen(Port, Opts, State) ->
147 case gen_tcp:listen(Port, Opts) of
148 {ok, Listen} ->
149 {ok, ListenPort} = inet:port(Listen),
150 {ok, new_acceptor(State#thrift_socket_server{listen=Listen,
151 port=ListenPort})};
152 {error, Reason} ->
153 {stop, Reason}
154 end.
155
156new_acceptor(State=#thrift_socket_server{max=0}) ->
157 error_logger:error_msg("Not accepting new connections"),
158 State#thrift_socket_server{acceptor=null};
David Reissb7c88022008-06-11 01:00:20 +0000159new_acceptor(State=#thrift_socket_server{acceptor=OldPid, listen=Listen,
160 service=Service, handler=Handler,
161 socket_opts=Opts
162 }) ->
David Reiss80862312008-06-11 00:59:55 +0000163 Pid = proc_lib:spawn_link(?MODULE, acceptor_loop,
David Reissb7c88022008-06-11 01:00:20 +0000164 [{self(), Listen, Service, Handler, Opts}]),
David Reiss919a8012008-06-11 01:00:12 +0000165%% error_logger:info_msg("Spawning new acceptor: ~p => ~p", [OldPid, Pid]),
David Reiss80862312008-06-11 00:59:55 +0000166 State#thrift_socket_server{acceptor=Pid}.
167
David Reissb7c88022008-06-11 01:00:20 +0000168acceptor_loop({Server, Listen, Service, Handler, SocketOpts})
169 when is_pid(Server), is_list(SocketOpts) ->
170 case catch gen_tcp:accept(Listen) of % infiinite timeout
David Reiss80862312008-06-11 00:59:55 +0000171 {ok, Socket} ->
172 gen_server:cast(Server, {accepted, self()}),
173 ProtoGen = fun() ->
David Reissb7c88022008-06-11 01:00:20 +0000174 {ok, SocketTransport} = thrift_socket_transport:new(Socket, SocketOpts),
David Reiss80862312008-06-11 00:59:55 +0000175 {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport),
176 {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport),
177 {ok, IProt=Protocol, OProt=Protocol}
178 end,
179 thrift_processor:init({Server, ProtoGen, Service, Handler});
180 {error, closed} ->
181 exit({error, closed});
182 Other ->
183 error_logger:error_report(
184 [{application, thrift},
185 "Accept failed error",
186 lists:flatten(io_lib:format("~p", [Other]))]),
187 exit({error, accept_failed})
188 end.
189
190handle_call({get, port}, _From, State=#thrift_socket_server{port=Port}) ->
191 {reply, Port, State};
192handle_call(_Message, _From, State) ->
193 Res = error,
194 {reply, Res, State}.
195
196handle_cast({accepted, Pid},
197 State=#thrift_socket_server{acceptor=Pid, max=Max}) ->
198 % io:format("accepted ~p~n", [Pid]),
199 State1 = State#thrift_socket_server{max=Max - 1},
200 {noreply, new_acceptor(State1)};
201handle_cast(stop, State) ->
202 {stop, normal, State}.
203
204terminate(_Reason, #thrift_socket_server{listen=Listen, port=Port}) ->
205 gen_tcp:close(Listen),
206 case Port < 1024 of
207 true ->
208 catch fdsrv:stop(),
209 ok;
210 false ->
211 ok
212 end.
213
214code_change(_OldVsn, State, _Extra) ->
215 State.
216
217handle_info({'EXIT', Pid, normal},
218 State=#thrift_socket_server{acceptor=Pid}) ->
219 {noreply, new_acceptor(State)};
220handle_info({'EXIT', Pid, Reason},
221 State=#thrift_socket_server{acceptor=Pid}) ->
222 error_logger:error_report({?MODULE, ?LINE,
223 {acceptor_error, Reason}}),
224 timer:sleep(100),
225 {noreply, new_acceptor(State)};
226handle_info({'EXIT', _LoopPid, Reason},
227 State=#thrift_socket_server{acceptor=Pid, max=Max}) ->
228 case Reason of
229 normal -> ok;
230 protocol_closed -> ok;
231 _ -> error_logger:error_report({?MODULE, ?LINE,
232 {child_error, Reason, erlang:get_stacktrace()}})
233 end,
234 State1 = State#thrift_socket_server{max=Max + 1},
235 State2 = case Pid of
236 null -> new_acceptor(State1);
237 _ -> State1
238 end,
239 {noreply, State2};
240handle_info(Info, State) ->
241 error_logger:info_report([{'INFO', Info}, {'State', State}]),
242 {noreply, State}.