blob: 233b992cc579674376ed6996f87f08e3017414d9 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001%%
2%% Licensed to the Apache Software Foundation (ASF) under one
3%% or more contributor license agreements. See the NOTICE file
4%% distributed with this work for additional information
5%% regarding copyright ownership. The ASF licenses this file
6%% to you under the Apache License, Version 2.0 (the
7%% "License"); you may not use this file except in compliance
8%% with the License. You may obtain a copy of the License at
9%%
10%% http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing,
13%% software distributed under the License is distributed on an
14%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15%% KIND, either express or implied. See the License for the
16%% specific language governing permissions and limitations
17%% under the License.
18%%
David Reiss80862312008-06-11 00:59:55 +000019
David Reissea2cba82009-03-30 21:35:00 +000020-module(thrift_socket_server).
David Reiss80862312008-06-11 00:59:55 +000021
22-behaviour(gen_server).
23
24-export([start/1, stop/1]).
25
26-export([init/1, handle_call/3, handle_cast/2, terminate/2, code_change/3,
David Reiss1a2f2182008-06-11 01:14:01 +000027 handle_info/2]).
David Reiss80862312008-06-11 00:59:55 +000028
29-export([acceptor_loop/1]).
30
31-record(thrift_socket_server,
David Reiss1a2f2182008-06-11 01:14:01 +000032 {port,
David Reiss80862312008-06-11 00:59:55 +000033 service,
34 handler,
David Reiss1a2f2182008-06-11 01:14:01 +000035 name,
36 max=2048,
37 ip=any,
38 listen=null,
39 acceptor=null,
David Reissb42361c2009-09-09 17:18:57 +000040 socket_opts=[{recv_timeout, 500}],
David Robakowskia7d6a972013-08-07 05:51:00 +020041 framed=false,
42 ssltransport=false,
43 ssloptions=[]
David Reissb7c88022008-06-11 01:00:20 +000044 }).
David Reiss80862312008-06-11 00:59:55 +000045
46start(State=#thrift_socket_server{}) ->
David Reiss80862312008-06-11 00:59:55 +000047 start_server(State);
48start(Options) ->
49 start(parse_options(Options)).
50
51stop(Name) when is_atom(Name) ->
52 gen_server:cast(Name, stop);
53stop(Pid) when is_pid(Pid) ->
54 gen_server:cast(Pid, stop);
55stop({local, Name}) ->
56 stop(Name);
57stop({global, Name}) ->
58 stop(Name);
59stop(Options) ->
60 State = parse_options(Options),
61 stop(State#thrift_socket_server.name).
62
63%% Internal API
64
65parse_options(Options) ->
66 parse_options(Options, #thrift_socket_server{}).
67
68parse_options([], State) ->
69 State;
70parse_options([{name, L} | Rest], State) when is_list(L) ->
71 Name = {local, list_to_atom(L)},
72 parse_options(Rest, State#thrift_socket_server{name=Name});
73parse_options([{name, A} | Rest], State) when is_atom(A) ->
74 Name = {local, A},
75 parse_options(Rest, State#thrift_socket_server{name=Name});
76parse_options([{name, Name} | Rest], State) ->
77 parse_options(Rest, State#thrift_socket_server{name=Name});
78parse_options([{port, L} | Rest], State) when is_list(L) ->
79 Port = list_to_integer(L),
80 parse_options(Rest, State#thrift_socket_server{port=Port});
81parse_options([{port, Port} | Rest], State) ->
82 parse_options(Rest, State#thrift_socket_server{port=Port});
83parse_options([{ip, Ip} | Rest], State) ->
84 ParsedIp = case Ip of
David Reiss1a2f2182008-06-11 01:14:01 +000085 any ->
86 any;
87 Ip when is_tuple(Ip) ->
88 Ip;
89 Ip when is_list(Ip) ->
90 {ok, IpTuple} = inet_parse:address(Ip),
91 IpTuple
92 end,
David Reiss80862312008-06-11 00:59:55 +000093 parse_options(Rest, State#thrift_socket_server{ip=ParsedIp});
David Reissb7c88022008-06-11 01:00:20 +000094parse_options([{socket_opts, L} | Rest], State) when is_list(L), length(L) > 0 ->
95 parse_options(Rest, State#thrift_socket_server{socket_opts=L});
David Reiss80862312008-06-11 00:59:55 +000096parse_options([{handler, Handler} | Rest], State) ->
97 parse_options(Rest, State#thrift_socket_server{handler=Handler});
98parse_options([{service, Service} | Rest], State) ->
99 parse_options(Rest, State#thrift_socket_server{service=Service});
100parse_options([{max, Max} | Rest], State) ->
101 MaxInt = case Max of
David Reiss1a2f2182008-06-11 01:14:01 +0000102 Max when is_list(Max) ->
103 list_to_integer(Max);
104 Max when is_integer(Max) ->
105 Max
106 end,
David Reissb42361c2009-09-09 17:18:57 +0000107 parse_options(Rest, State#thrift_socket_server{max=MaxInt});
David Robakowskia7d6a972013-08-07 05:51:00 +0200108
David Reissb42361c2009-09-09 17:18:57 +0000109parse_options([{framed, Framed} | Rest], State) when is_boolean(Framed) ->
David Robakowskia7d6a972013-08-07 05:51:00 +0200110 parse_options(Rest, State#thrift_socket_server{framed=Framed});
111
112parse_options([{ssltransport, SSLTransport} | Rest], State) when is_boolean(SSLTransport) ->
113 parse_options(Rest, State#thrift_socket_server{ssltransport=SSLTransport});
114parse_options([{ssloptions, SSLOptions} | Rest], State) when is_list(SSLOptions) ->
115 parse_options(Rest, State#thrift_socket_server{ssloptions=SSLOptions}).
David Reiss80862312008-06-11 00:59:55 +0000116
117start_server(State=#thrift_socket_server{name=Name}) ->
David Reiss80862312008-06-11 00:59:55 +0000118 case Name of
David Reiss1a2f2182008-06-11 01:14:01 +0000119 undefined ->
120 gen_server:start_link(?MODULE, State, []);
121 _ ->
122 gen_server:start_link(Name, ?MODULE, State, [])
David Reiss80862312008-06-11 00:59:55 +0000123 end.
124
125init(State=#thrift_socket_server{ip=Ip, port=Port}) ->
David Reissd74b0232008-06-11 01:02:55 +0000126 process_flag(trap_exit, true),
David Reiss80862312008-06-11 00:59:55 +0000127 BaseOpts = [binary,
David Reiss1a2f2182008-06-11 01:14:01 +0000128 {reuseaddr, true},
129 {packet, 0},
130 {backlog, 4096},
131 {recbuf, 8192},
132 {active, false}],
David Reiss80862312008-06-11 00:59:55 +0000133 Opts = case Ip of
David Reiss1a2f2182008-06-11 01:14:01 +0000134 any ->
David Reiss80862312008-06-11 00:59:55 +0000135 BaseOpts;
David Reiss1a2f2182008-06-11 01:14:01 +0000136 Ip ->
137 [{ip, Ip} | BaseOpts]
138 end,
David Reiss80862312008-06-11 00:59:55 +0000139 case gen_tcp_listen(Port, Opts, State) of
140 {stop, eacces} ->
141 %% fdsrv module allows another shot to bind
142 %% ports which require root access
143 case Port < 1024 of
144 true ->
145 case fdsrv:start() of
146 {ok, _} ->
147 case fdsrv:bind_socket(tcp, Port) of
148 {ok, Fd} ->
149 gen_tcp_listen(Port, [{fd, Fd} | Opts], State);
150 _ ->
151 {stop, fdsrv_bind_failed}
152 end;
153 _ ->
154 {stop, fdsrv_start_failed}
155 end;
156 false ->
157 {stop, eacces}
158 end;
159 Other ->
160 error_logger:info_msg("thrift service listening on port ~p", [Port]),
161 Other
162 end.
163
164gen_tcp_listen(Port, Opts, State) ->
165 case gen_tcp:listen(Port, Opts) of
166 {ok, Listen} ->
David Reiss1a2f2182008-06-11 01:14:01 +0000167 {ok, ListenPort} = inet:port(Listen),
168 {ok, new_acceptor(State#thrift_socket_server{listen=Listen,
David Reiss80862312008-06-11 00:59:55 +0000169 port=ListenPort})};
David Reiss1a2f2182008-06-11 01:14:01 +0000170 {error, Reason} ->
171 {stop, Reason}
David Reiss80862312008-06-11 00:59:55 +0000172 end.
173
174new_acceptor(State=#thrift_socket_server{max=0}) ->
175 error_logger:error_msg("Not accepting new connections"),
176 State#thrift_socket_server{acceptor=null};
David Reiss5ed313d2010-08-30 22:05:57 +0000177new_acceptor(State=#thrift_socket_server{listen=Listen,
David Reissb7c88022008-06-11 01:00:20 +0000178 service=Service, handler=Handler,
David Robakowskia7d6a972013-08-07 05:51:00 +0200179 socket_opts=Opts, framed=Framed,
180 ssltransport=SslTransport, ssloptions=SslOptions
David Reissb7c88022008-06-11 01:00:20 +0000181 }) ->
David Reiss80862312008-06-11 00:59:55 +0000182 Pid = proc_lib:spawn_link(?MODULE, acceptor_loop,
David Robakowskia7d6a972013-08-07 05:51:00 +0200183 [{self(), Listen, Service, Handler, Opts, Framed, SslTransport, SslOptions}]),
David Reiss80862312008-06-11 00:59:55 +0000184 State#thrift_socket_server{acceptor=Pid}.
185
David Robakowskia7d6a972013-08-07 05:51:00 +0200186acceptor_loop({Server, Listen, Service, Handler, SocketOpts, Framed, SslTransport, SslOptions})
David Reissb7c88022008-06-11 01:00:20 +0000187 when is_pid(Server), is_list(SocketOpts) ->
David Reissd74b0232008-06-11 01:02:55 +0000188 case catch gen_tcp:accept(Listen) of % infinite timeout
David Reiss1a2f2182008-06-11 01:14:01 +0000189 {ok, Socket} ->
190 gen_server:cast(Server, {accepted, self()}),
David Reiss80862312008-06-11 00:59:55 +0000191 ProtoGen = fun() ->
David Robakowskia7d6a972013-08-07 05:51:00 +0200192 {ok, SocketTransport} = case SslTransport of
193 true -> thrift_sslsocket_transport:new(Socket, SocketOpts, SslOptions);
194 false -> thrift_socket_transport:new(Socket, SocketOpts)
195 end,
196 {ok, Transport} = case Framed of
197 true -> thrift_framed_transport:new(SocketTransport);
198 false -> thrift_buffered_transport:new(SocketTransport)
199 end,
200 {ok, Protocol} = thrift_binary_protocol:new(Transport),
David Reiss035979f2010-08-30 22:05:38 +0000201 {ok, Protocol}
David Reiss80862312008-06-11 00:59:55 +0000202 end,
203 thrift_processor:init({Server, ProtoGen, Service, Handler});
David Reiss1a2f2182008-06-11 01:14:01 +0000204 {error, closed} ->
205 exit({error, closed});
206 Other ->
207 error_logger:error_report(
208 [{application, thrift},
209 "Accept failed error",
210 lists:flatten(io_lib:format("~p", [Other]))]),
211 exit({error, accept_failed})
David Reiss80862312008-06-11 00:59:55 +0000212 end.
213
214handle_call({get, port}, _From, State=#thrift_socket_server{port=Port}) ->
215 {reply, Port, State};
216handle_call(_Message, _From, State) ->
217 Res = error,
218 {reply, Res, State}.
219
220handle_cast({accepted, Pid},
David Reiss1a2f2182008-06-11 01:14:01 +0000221 State=#thrift_socket_server{acceptor=Pid, max=Max}) ->
David Reiss80862312008-06-11 00:59:55 +0000222 % io:format("accepted ~p~n", [Pid]),
223 State1 = State#thrift_socket_server{max=Max - 1},
224 {noreply, new_acceptor(State1)};
225handle_cast(stop, State) ->
226 {stop, normal, State}.
227
228terminate(_Reason, #thrift_socket_server{listen=Listen, port=Port}) ->
229 gen_tcp:close(Listen),
230 case Port < 1024 of
231 true ->
232 catch fdsrv:stop(),
233 ok;
234 false ->
235 ok
236 end.
237
238code_change(_OldVsn, State, _Extra) ->
239 State.
240
241handle_info({'EXIT', Pid, normal},
David Reiss1a2f2182008-06-11 01:14:01 +0000242 State=#thrift_socket_server{acceptor=Pid}) ->
David Reiss80862312008-06-11 00:59:55 +0000243 {noreply, new_acceptor(State)};
244handle_info({'EXIT', Pid, Reason},
David Reiss1a2f2182008-06-11 01:14:01 +0000245 State=#thrift_socket_server{acceptor=Pid}) ->
David Reiss80862312008-06-11 00:59:55 +0000246 error_logger:error_report({?MODULE, ?LINE,
David Reiss1a2f2182008-06-11 01:14:01 +0000247 {acceptor_error, Reason}}),
David Reiss80862312008-06-11 00:59:55 +0000248 timer:sleep(100),
249 {noreply, new_acceptor(State)};
250handle_info({'EXIT', _LoopPid, Reason},
David Reiss1a2f2182008-06-11 01:14:01 +0000251 State=#thrift_socket_server{acceptor=Pid, max=Max}) ->
David Reiss80862312008-06-11 00:59:55 +0000252 case Reason of
David Reiss1a2f2182008-06-11 01:14:01 +0000253 normal -> ok;
David Reiss4cf5a6a2008-06-11 01:00:59 +0000254 shutdown -> ok;
David Reiss1a2f2182008-06-11 01:14:01 +0000255 _ -> error_logger:error_report({?MODULE, ?LINE,
David Reiss80862312008-06-11 00:59:55 +0000256 {child_error, Reason, erlang:get_stacktrace()}})
257 end,
258 State1 = State#thrift_socket_server{max=Max + 1},
259 State2 = case Pid of
David Reiss1a2f2182008-06-11 01:14:01 +0000260 null -> new_acceptor(State1);
261 _ -> State1
262 end,
David Reiss80862312008-06-11 00:59:55 +0000263 {noreply, State2};
264handle_info(Info, State) ->
265 error_logger:info_report([{'INFO', Info}, {'State', State}]),
266 {noreply, State}.