blob: f7c7a028f60b044cb1a187a28d5e94e400f5e0f1 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001%%
2%% Licensed to the Apache Software Foundation (ASF) under one
3%% or more contributor license agreements. See the NOTICE file
4%% distributed with this work for additional information
5%% regarding copyright ownership. The ASF licenses this file
6%% to you under the Apache License, Version 2.0 (the
7%% "License"); you may not use this file except in compliance
8%% with the License. You may obtain a copy of the License at
9%%
10%% http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing,
13%% software distributed under the License is distributed on an
14%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15%% KIND, either express or implied. See the License for the
16%% specific language governing permissions and limitations
17%% under the License.
18%%
David Reiss80862312008-06-11 00:59:55 +000019
David Reissea2cba82009-03-30 21:35:00 +000020-module(thrift_socket_server).
David Reiss80862312008-06-11 00:59:55 +000021
22-behaviour(gen_server).
23
24-export([start/1, stop/1]).
25
26-export([init/1, handle_call/3, handle_cast/2, terminate/2, code_change/3,
David Reiss1a2f2182008-06-11 01:14:01 +000027 handle_info/2]).
David Reiss80862312008-06-11 00:59:55 +000028
29-export([acceptor_loop/1]).
30
31-record(thrift_socket_server,
David Reiss1a2f2182008-06-11 01:14:01 +000032 {port,
David Reiss80862312008-06-11 00:59:55 +000033 service,
34 handler,
David Reiss1a2f2182008-06-11 01:14:01 +000035 name,
36 max=2048,
37 ip=any,
38 listen=null,
39 acceptor=null,
David Reissb42361c2009-09-09 17:18:57 +000040 socket_opts=[{recv_timeout, 500}],
41 framed=false
David Reissb7c88022008-06-11 01:00:20 +000042 }).
David Reiss80862312008-06-11 00:59:55 +000043
44start(State=#thrift_socket_server{}) ->
David Reiss80862312008-06-11 00:59:55 +000045 start_server(State);
46start(Options) ->
47 start(parse_options(Options)).
48
49stop(Name) when is_atom(Name) ->
50 gen_server:cast(Name, stop);
51stop(Pid) when is_pid(Pid) ->
52 gen_server:cast(Pid, stop);
53stop({local, Name}) ->
54 stop(Name);
55stop({global, Name}) ->
56 stop(Name);
57stop(Options) ->
58 State = parse_options(Options),
59 stop(State#thrift_socket_server.name).
60
61%% Internal API
62
63parse_options(Options) ->
64 parse_options(Options, #thrift_socket_server{}).
65
66parse_options([], State) ->
67 State;
68parse_options([{name, L} | Rest], State) when is_list(L) ->
69 Name = {local, list_to_atom(L)},
70 parse_options(Rest, State#thrift_socket_server{name=Name});
71parse_options([{name, A} | Rest], State) when is_atom(A) ->
72 Name = {local, A},
73 parse_options(Rest, State#thrift_socket_server{name=Name});
74parse_options([{name, Name} | Rest], State) ->
75 parse_options(Rest, State#thrift_socket_server{name=Name});
76parse_options([{port, L} | Rest], State) when is_list(L) ->
77 Port = list_to_integer(L),
78 parse_options(Rest, State#thrift_socket_server{port=Port});
79parse_options([{port, Port} | Rest], State) ->
80 parse_options(Rest, State#thrift_socket_server{port=Port});
81parse_options([{ip, Ip} | Rest], State) ->
82 ParsedIp = case Ip of
David Reiss1a2f2182008-06-11 01:14:01 +000083 any ->
84 any;
85 Ip when is_tuple(Ip) ->
86 Ip;
87 Ip when is_list(Ip) ->
88 {ok, IpTuple} = inet_parse:address(Ip),
89 IpTuple
90 end,
David Reiss80862312008-06-11 00:59:55 +000091 parse_options(Rest, State#thrift_socket_server{ip=ParsedIp});
David Reissb7c88022008-06-11 01:00:20 +000092parse_options([{socket_opts, L} | Rest], State) when is_list(L), length(L) > 0 ->
93 parse_options(Rest, State#thrift_socket_server{socket_opts=L});
David Reiss80862312008-06-11 00:59:55 +000094parse_options([{handler, Handler} | Rest], State) ->
95 parse_options(Rest, State#thrift_socket_server{handler=Handler});
96parse_options([{service, Service} | Rest], State) ->
97 parse_options(Rest, State#thrift_socket_server{service=Service});
98parse_options([{max, Max} | Rest], State) ->
99 MaxInt = case Max of
David Reiss1a2f2182008-06-11 01:14:01 +0000100 Max when is_list(Max) ->
101 list_to_integer(Max);
102 Max when is_integer(Max) ->
103 Max
104 end,
David Reissb42361c2009-09-09 17:18:57 +0000105 parse_options(Rest, State#thrift_socket_server{max=MaxInt});
106parse_options([{framed, Framed} | Rest], State) when is_boolean(Framed) ->
107 parse_options(Rest, State#thrift_socket_server{framed=Framed}).
David Reiss80862312008-06-11 00:59:55 +0000108
109start_server(State=#thrift_socket_server{name=Name}) ->
David Reiss80862312008-06-11 00:59:55 +0000110 case Name of
David Reiss1a2f2182008-06-11 01:14:01 +0000111 undefined ->
112 gen_server:start_link(?MODULE, State, []);
113 _ ->
114 gen_server:start_link(Name, ?MODULE, State, [])
David Reiss80862312008-06-11 00:59:55 +0000115 end.
116
117init(State=#thrift_socket_server{ip=Ip, port=Port}) ->
David Reissd74b0232008-06-11 01:02:55 +0000118 process_flag(trap_exit, true),
David Reiss80862312008-06-11 00:59:55 +0000119 BaseOpts = [binary,
David Reiss1a2f2182008-06-11 01:14:01 +0000120 {reuseaddr, true},
121 {packet, 0},
122 {backlog, 4096},
123 {recbuf, 8192},
124 {active, false}],
David Reiss80862312008-06-11 00:59:55 +0000125 Opts = case Ip of
David Reiss1a2f2182008-06-11 01:14:01 +0000126 any ->
David Reiss80862312008-06-11 00:59:55 +0000127 BaseOpts;
David Reiss1a2f2182008-06-11 01:14:01 +0000128 Ip ->
129 [{ip, Ip} | BaseOpts]
130 end,
David Reiss80862312008-06-11 00:59:55 +0000131 case gen_tcp_listen(Port, Opts, State) of
132 {stop, eacces} ->
133 %% fdsrv module allows another shot to bind
134 %% ports which require root access
135 case Port < 1024 of
136 true ->
137 case fdsrv:start() of
138 {ok, _} ->
139 case fdsrv:bind_socket(tcp, Port) of
140 {ok, Fd} ->
141 gen_tcp_listen(Port, [{fd, Fd} | Opts], State);
142 _ ->
143 {stop, fdsrv_bind_failed}
144 end;
145 _ ->
146 {stop, fdsrv_start_failed}
147 end;
148 false ->
149 {stop, eacces}
150 end;
151 Other ->
152 error_logger:info_msg("thrift service listening on port ~p", [Port]),
153 Other
154 end.
155
156gen_tcp_listen(Port, Opts, State) ->
157 case gen_tcp:listen(Port, Opts) of
158 {ok, Listen} ->
David Reiss1a2f2182008-06-11 01:14:01 +0000159 {ok, ListenPort} = inet:port(Listen),
160 {ok, new_acceptor(State#thrift_socket_server{listen=Listen,
David Reiss80862312008-06-11 00:59:55 +0000161 port=ListenPort})};
David Reiss1a2f2182008-06-11 01:14:01 +0000162 {error, Reason} ->
163 {stop, Reason}
David Reiss80862312008-06-11 00:59:55 +0000164 end.
165
166new_acceptor(State=#thrift_socket_server{max=0}) ->
167 error_logger:error_msg("Not accepting new connections"),
168 State#thrift_socket_server{acceptor=null};
David Reiss5ed313d2010-08-30 22:05:57 +0000169new_acceptor(State=#thrift_socket_server{listen=Listen,
David Reissb7c88022008-06-11 01:00:20 +0000170 service=Service, handler=Handler,
David Reissb42361c2009-09-09 17:18:57 +0000171 socket_opts=Opts, framed=Framed
David Reissb7c88022008-06-11 01:00:20 +0000172 }) ->
David Reiss80862312008-06-11 00:59:55 +0000173 Pid = proc_lib:spawn_link(?MODULE, acceptor_loop,
David Reissb42361c2009-09-09 17:18:57 +0000174 [{self(), Listen, Service, Handler, Opts, Framed}]),
David Reiss80862312008-06-11 00:59:55 +0000175 State#thrift_socket_server{acceptor=Pid}.
176
David Reissb42361c2009-09-09 17:18:57 +0000177acceptor_loop({Server, Listen, Service, Handler, SocketOpts, Framed})
David Reissb7c88022008-06-11 01:00:20 +0000178 when is_pid(Server), is_list(SocketOpts) ->
David Reissd74b0232008-06-11 01:02:55 +0000179 case catch gen_tcp:accept(Listen) of % infinite timeout
David Reiss1a2f2182008-06-11 01:14:01 +0000180 {ok, Socket} ->
181 gen_server:cast(Server, {accepted, self()}),
David Reiss80862312008-06-11 00:59:55 +0000182 ProtoGen = fun() ->
David Reissb7c88022008-06-11 01:00:20 +0000183 {ok, SocketTransport} = thrift_socket_transport:new(Socket, SocketOpts),
David Reissb42361c2009-09-09 17:18:57 +0000184 {ok, Transport} =
185 case Framed of
186 true -> thrift_framed_transport:new(SocketTransport);
187 false -> thrift_buffered_transport:new(SocketTransport)
188 end,
189 {ok, Protocol} = thrift_binary_protocol:new(Transport),
David Reiss035979f2010-08-30 22:05:38 +0000190 {ok, Protocol}
David Reiss80862312008-06-11 00:59:55 +0000191 end,
192 thrift_processor:init({Server, ProtoGen, Service, Handler});
David Reiss1a2f2182008-06-11 01:14:01 +0000193 {error, closed} ->
194 exit({error, closed});
195 Other ->
196 error_logger:error_report(
197 [{application, thrift},
198 "Accept failed error",
199 lists:flatten(io_lib:format("~p", [Other]))]),
200 exit({error, accept_failed})
David Reiss80862312008-06-11 00:59:55 +0000201 end.
202
203handle_call({get, port}, _From, State=#thrift_socket_server{port=Port}) ->
204 {reply, Port, State};
205handle_call(_Message, _From, State) ->
206 Res = error,
207 {reply, Res, State}.
208
209handle_cast({accepted, Pid},
David Reiss1a2f2182008-06-11 01:14:01 +0000210 State=#thrift_socket_server{acceptor=Pid, max=Max}) ->
David Reiss80862312008-06-11 00:59:55 +0000211 % io:format("accepted ~p~n", [Pid]),
212 State1 = State#thrift_socket_server{max=Max - 1},
213 {noreply, new_acceptor(State1)};
214handle_cast(stop, State) ->
215 {stop, normal, State}.
216
217terminate(_Reason, #thrift_socket_server{listen=Listen, port=Port}) ->
218 gen_tcp:close(Listen),
219 case Port < 1024 of
220 true ->
221 catch fdsrv:stop(),
222 ok;
223 false ->
224 ok
225 end.
226
227code_change(_OldVsn, State, _Extra) ->
228 State.
229
230handle_info({'EXIT', Pid, normal},
David Reiss1a2f2182008-06-11 01:14:01 +0000231 State=#thrift_socket_server{acceptor=Pid}) ->
David Reiss80862312008-06-11 00:59:55 +0000232 {noreply, new_acceptor(State)};
233handle_info({'EXIT', Pid, Reason},
David Reiss1a2f2182008-06-11 01:14:01 +0000234 State=#thrift_socket_server{acceptor=Pid}) ->
David Reiss80862312008-06-11 00:59:55 +0000235 error_logger:error_report({?MODULE, ?LINE,
David Reiss1a2f2182008-06-11 01:14:01 +0000236 {acceptor_error, Reason}}),
David Reiss80862312008-06-11 00:59:55 +0000237 timer:sleep(100),
238 {noreply, new_acceptor(State)};
239handle_info({'EXIT', _LoopPid, Reason},
David Reiss1a2f2182008-06-11 01:14:01 +0000240 State=#thrift_socket_server{acceptor=Pid, max=Max}) ->
David Reiss80862312008-06-11 00:59:55 +0000241 case Reason of
David Reiss1a2f2182008-06-11 01:14:01 +0000242 normal -> ok;
David Reiss4cf5a6a2008-06-11 01:00:59 +0000243 shutdown -> ok;
David Reiss1a2f2182008-06-11 01:14:01 +0000244 _ -> error_logger:error_report({?MODULE, ?LINE,
David Reiss80862312008-06-11 00:59:55 +0000245 {child_error, Reason, erlang:get_stacktrace()}})
246 end,
247 State1 = State#thrift_socket_server{max=Max + 1},
248 State2 = case Pid of
David Reiss1a2f2182008-06-11 01:14:01 +0000249 null -> new_acceptor(State1);
250 _ -> State1
251 end,
David Reiss80862312008-06-11 00:59:55 +0000252 {noreply, State2};
253handle_info(Info, State) ->
254 error_logger:info_report([{'INFO', Info}, {'State', State}]),
255 {noreply, State}.