blob: e9ad6f4c53070cf972956e28c295316e71e6c8c4 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001%%
2%% Licensed to the Apache Software Foundation (ASF) under one
3%% or more contributor license agreements. See the NOTICE file
4%% distributed with this work for additional information
5%% regarding copyright ownership. The ASF licenses this file
6%% to you under the Apache License, Version 2.0 (the
7%% "License"); you may not use this file except in compliance
8%% with the License. You may obtain a copy of the License at
9%%
10%% http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing,
13%% software distributed under the License is distributed on an
14%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15%% KIND, either express or implied. See the License for the
16%% specific language governing permissions and limitations
17%% under the License.
18%%
David Reiss80862312008-06-11 00:59:55 +000019
David Reissea2cba82009-03-30 21:35:00 +000020-module(thrift_socket_server).
David Reiss80862312008-06-11 00:59:55 +000021
22-behaviour(gen_server).
23
24-export([start/1, stop/1]).
25
26-export([init/1, handle_call/3, handle_cast/2, terminate/2, code_change/3,
David Reiss1a2f2182008-06-11 01:14:01 +000027 handle_info/2]).
David Reiss80862312008-06-11 00:59:55 +000028
29-export([acceptor_loop/1]).
30
31-record(thrift_socket_server,
David Reiss1a2f2182008-06-11 01:14:01 +000032 {port,
David Reiss80862312008-06-11 00:59:55 +000033 service,
34 handler,
David Reiss1a2f2182008-06-11 01:14:01 +000035 name,
36 max=2048,
37 ip=any,
38 listen=null,
39 acceptor=null,
David Reissb42361c2009-09-09 17:18:57 +000040 socket_opts=[{recv_timeout, 500}],
Nobuaki Sukegawab31f0902015-11-01 17:00:34 +090041 protocol=binary,
David Robakowskia7d6a972013-08-07 05:51:00 +020042 framed=false,
43 ssltransport=false,
44 ssloptions=[]
David Reissb7c88022008-06-11 01:00:20 +000045 }).
David Reiss80862312008-06-11 00:59:55 +000046
47start(State=#thrift_socket_server{}) ->
David Reiss80862312008-06-11 00:59:55 +000048 start_server(State);
49start(Options) ->
50 start(parse_options(Options)).
51
52stop(Name) when is_atom(Name) ->
53 gen_server:cast(Name, stop);
54stop(Pid) when is_pid(Pid) ->
55 gen_server:cast(Pid, stop);
56stop({local, Name}) ->
57 stop(Name);
58stop({global, Name}) ->
59 stop(Name);
60stop(Options) ->
61 State = parse_options(Options),
62 stop(State#thrift_socket_server.name).
63
64%% Internal API
65
66parse_options(Options) ->
67 parse_options(Options, #thrift_socket_server{}).
68
69parse_options([], State) ->
70 State;
71parse_options([{name, L} | Rest], State) when is_list(L) ->
72 Name = {local, list_to_atom(L)},
73 parse_options(Rest, State#thrift_socket_server{name=Name});
74parse_options([{name, A} | Rest], State) when is_atom(A) ->
75 Name = {local, A},
76 parse_options(Rest, State#thrift_socket_server{name=Name});
77parse_options([{name, Name} | Rest], State) ->
78 parse_options(Rest, State#thrift_socket_server{name=Name});
79parse_options([{port, L} | Rest], State) when is_list(L) ->
80 Port = list_to_integer(L),
81 parse_options(Rest, State#thrift_socket_server{port=Port});
82parse_options([{port, Port} | Rest], State) ->
83 parse_options(Rest, State#thrift_socket_server{port=Port});
84parse_options([{ip, Ip} | Rest], State) ->
85 ParsedIp = case Ip of
David Reiss1a2f2182008-06-11 01:14:01 +000086 any ->
87 any;
88 Ip when is_tuple(Ip) ->
89 Ip;
90 Ip when is_list(Ip) ->
91 {ok, IpTuple} = inet_parse:address(Ip),
92 IpTuple
93 end,
David Reiss80862312008-06-11 00:59:55 +000094 parse_options(Rest, State#thrift_socket_server{ip=ParsedIp});
David Reissb7c88022008-06-11 01:00:20 +000095parse_options([{socket_opts, L} | Rest], State) when is_list(L), length(L) > 0 ->
96 parse_options(Rest, State#thrift_socket_server{socket_opts=L});
David Reiss80862312008-06-11 00:59:55 +000097parse_options([{handler, Handler} | Rest], State) ->
98 parse_options(Rest, State#thrift_socket_server{handler=Handler});
99parse_options([{service, Service} | Rest], State) ->
100 parse_options(Rest, State#thrift_socket_server{service=Service});
101parse_options([{max, Max} | Rest], State) ->
102 MaxInt = case Max of
David Reiss1a2f2182008-06-11 01:14:01 +0000103 Max when is_list(Max) ->
104 list_to_integer(Max);
105 Max when is_integer(Max) ->
106 Max
107 end,
David Reissb42361c2009-09-09 17:18:57 +0000108 parse_options(Rest, State#thrift_socket_server{max=MaxInt});
David Robakowskia7d6a972013-08-07 05:51:00 +0200109
Nobuaki Sukegawab31f0902015-11-01 17:00:34 +0900110parse_options([{protocol, Proto} | Rest], State) when is_atom(Proto) ->
111 parse_options(Rest, State#thrift_socket_server{protocol=Proto});
112
David Reissb42361c2009-09-09 17:18:57 +0000113parse_options([{framed, Framed} | Rest], State) when is_boolean(Framed) ->
David Robakowskia7d6a972013-08-07 05:51:00 +0200114 parse_options(Rest, State#thrift_socket_server{framed=Framed});
115
116parse_options([{ssltransport, SSLTransport} | Rest], State) when is_boolean(SSLTransport) ->
117 parse_options(Rest, State#thrift_socket_server{ssltransport=SSLTransport});
118parse_options([{ssloptions, SSLOptions} | Rest], State) when is_list(SSLOptions) ->
119 parse_options(Rest, State#thrift_socket_server{ssloptions=SSLOptions}).
David Reiss80862312008-06-11 00:59:55 +0000120
121start_server(State=#thrift_socket_server{name=Name}) ->
David Reiss80862312008-06-11 00:59:55 +0000122 case Name of
David Reiss1a2f2182008-06-11 01:14:01 +0000123 undefined ->
124 gen_server:start_link(?MODULE, State, []);
125 _ ->
126 gen_server:start_link(Name, ?MODULE, State, [])
David Reiss80862312008-06-11 00:59:55 +0000127 end.
128
129init(State=#thrift_socket_server{ip=Ip, port=Port}) ->
David Reissd74b0232008-06-11 01:02:55 +0000130 process_flag(trap_exit, true),
David Reiss80862312008-06-11 00:59:55 +0000131 BaseOpts = [binary,
David Reiss1a2f2182008-06-11 01:14:01 +0000132 {reuseaddr, true},
133 {packet, 0},
134 {backlog, 4096},
135 {recbuf, 8192},
136 {active, false}],
David Reiss80862312008-06-11 00:59:55 +0000137 Opts = case Ip of
David Reiss1a2f2182008-06-11 01:14:01 +0000138 any ->
David Reiss80862312008-06-11 00:59:55 +0000139 BaseOpts;
David Reiss1a2f2182008-06-11 01:14:01 +0000140 Ip ->
141 [{ip, Ip} | BaseOpts]
142 end,
David Reiss80862312008-06-11 00:59:55 +0000143 case gen_tcp_listen(Port, Opts, State) of
144 {stop, eacces} ->
145 %% fdsrv module allows another shot to bind
146 %% ports which require root access
147 case Port < 1024 of
148 true ->
149 case fdsrv:start() of
150 {ok, _} ->
151 case fdsrv:bind_socket(tcp, Port) of
152 {ok, Fd} ->
153 gen_tcp_listen(Port, [{fd, Fd} | Opts], State);
154 _ ->
155 {stop, fdsrv_bind_failed}
156 end;
157 _ ->
158 {stop, fdsrv_start_failed}
159 end;
160 false ->
161 {stop, eacces}
162 end;
163 Other ->
164 error_logger:info_msg("thrift service listening on port ~p", [Port]),
165 Other
166 end.
167
168gen_tcp_listen(Port, Opts, State) ->
169 case gen_tcp:listen(Port, Opts) of
170 {ok, Listen} ->
David Reiss1a2f2182008-06-11 01:14:01 +0000171 {ok, ListenPort} = inet:port(Listen),
172 {ok, new_acceptor(State#thrift_socket_server{listen=Listen,
David Reiss80862312008-06-11 00:59:55 +0000173 port=ListenPort})};
David Reiss1a2f2182008-06-11 01:14:01 +0000174 {error, Reason} ->
175 {stop, Reason}
David Reiss80862312008-06-11 00:59:55 +0000176 end.
177
178new_acceptor(State=#thrift_socket_server{max=0}) ->
179 error_logger:error_msg("Not accepting new connections"),
180 State#thrift_socket_server{acceptor=null};
David Reiss5ed313d2010-08-30 22:05:57 +0000181new_acceptor(State=#thrift_socket_server{listen=Listen,
David Reissb7c88022008-06-11 01:00:20 +0000182 service=Service, handler=Handler,
Nobuaki Sukegawab31f0902015-11-01 17:00:34 +0900183 socket_opts=Opts, framed=Framed, protocol=Proto,
David Robakowskia7d6a972013-08-07 05:51:00 +0200184 ssltransport=SslTransport, ssloptions=SslOptions
David Reissb7c88022008-06-11 01:00:20 +0000185 }) ->
David Reiss80862312008-06-11 00:59:55 +0000186 Pid = proc_lib:spawn_link(?MODULE, acceptor_loop,
Nobuaki Sukegawab31f0902015-11-01 17:00:34 +0900187 [{self(), Listen, Service, Handler, Opts, Framed, SslTransport, SslOptions, Proto}]),
David Reiss80862312008-06-11 00:59:55 +0000188 State#thrift_socket_server{acceptor=Pid}.
189
Nobuaki Sukegawab31f0902015-11-01 17:00:34 +0900190acceptor_loop({Server, Listen, Service, Handler, SocketOpts, Framed, SslTransport, SslOptions, Proto})
David Reissb7c88022008-06-11 01:00:20 +0000191 when is_pid(Server), is_list(SocketOpts) ->
David Reissd74b0232008-06-11 01:02:55 +0000192 case catch gen_tcp:accept(Listen) of % infinite timeout
David Reiss1a2f2182008-06-11 01:14:01 +0000193 {ok, Socket} ->
194 gen_server:cast(Server, {accepted, self()}),
David Reiss80862312008-06-11 00:59:55 +0000195 ProtoGen = fun() ->
David Robakowskia7d6a972013-08-07 05:51:00 +0200196 {ok, SocketTransport} = case SslTransport of
197 true -> thrift_sslsocket_transport:new(Socket, SocketOpts, SslOptions);
198 false -> thrift_socket_transport:new(Socket, SocketOpts)
199 end,
200 {ok, Transport} = case Framed of
201 true -> thrift_framed_transport:new(SocketTransport);
202 false -> thrift_buffered_transport:new(SocketTransport)
203 end,
Nobuaki Sukegawab31f0902015-11-01 17:00:34 +0900204 {ok, Protocol} = case Proto of
205 compact -> thrift_compact_protocol:new(Transport);
206 json -> thrift_json_protocol:new(Transport);
207 _ -> thrift_binary_protocol:new(Transport)
208 end,
David Reiss035979f2010-08-30 22:05:38 +0000209 {ok, Protocol}
David Reiss80862312008-06-11 00:59:55 +0000210 end,
211 thrift_processor:init({Server, ProtoGen, Service, Handler});
David Reiss1a2f2182008-06-11 01:14:01 +0000212 {error, closed} ->
213 exit({error, closed});
214 Other ->
215 error_logger:error_report(
216 [{application, thrift},
217 "Accept failed error",
218 lists:flatten(io_lib:format("~p", [Other]))]),
219 exit({error, accept_failed})
David Reiss80862312008-06-11 00:59:55 +0000220 end.
221
222handle_call({get, port}, _From, State=#thrift_socket_server{port=Port}) ->
223 {reply, Port, State};
224handle_call(_Message, _From, State) ->
225 Res = error,
226 {reply, Res, State}.
227
228handle_cast({accepted, Pid},
David Reiss1a2f2182008-06-11 01:14:01 +0000229 State=#thrift_socket_server{acceptor=Pid, max=Max}) ->
David Reiss80862312008-06-11 00:59:55 +0000230 % io:format("accepted ~p~n", [Pid]),
231 State1 = State#thrift_socket_server{max=Max - 1},
232 {noreply, new_acceptor(State1)};
233handle_cast(stop, State) ->
234 {stop, normal, State}.
235
Nobuaki Sukegawab31f0902015-11-01 17:00:34 +0900236terminate(Reason, #thrift_socket_server{listen=Listen, port=Port}) ->
David Reiss80862312008-06-11 00:59:55 +0000237 gen_tcp:close(Listen),
Nobuaki Sukegawab31f0902015-11-01 17:00:34 +0900238 {backtrace, Bt} = erlang:process_info(self(), backtrace),
239 error_logger:error_report({?MODULE, ?LINE,
240 {child_error, Reason, Bt}}),
David Reiss80862312008-06-11 00:59:55 +0000241 case Port < 1024 of
242 true ->
243 catch fdsrv:stop(),
244 ok;
245 false ->
246 ok
247 end.
248
249code_change(_OldVsn, State, _Extra) ->
250 State.
251
252handle_info({'EXIT', Pid, normal},
David Reiss1a2f2182008-06-11 01:14:01 +0000253 State=#thrift_socket_server{acceptor=Pid}) ->
David Reiss80862312008-06-11 00:59:55 +0000254 {noreply, new_acceptor(State)};
255handle_info({'EXIT', Pid, Reason},
David Reiss1a2f2182008-06-11 01:14:01 +0000256 State=#thrift_socket_server{acceptor=Pid}) ->
David Reiss80862312008-06-11 00:59:55 +0000257 error_logger:error_report({?MODULE, ?LINE,
David Reiss1a2f2182008-06-11 01:14:01 +0000258 {acceptor_error, Reason}}),
David Reiss80862312008-06-11 00:59:55 +0000259 timer:sleep(100),
260 {noreply, new_acceptor(State)};
261handle_info({'EXIT', _LoopPid, Reason},
David Reiss1a2f2182008-06-11 01:14:01 +0000262 State=#thrift_socket_server{acceptor=Pid, max=Max}) ->
David Reiss80862312008-06-11 00:59:55 +0000263 case Reason of
David Reiss1a2f2182008-06-11 01:14:01 +0000264 normal -> ok;
David Reiss4cf5a6a2008-06-11 01:00:59 +0000265 shutdown -> ok;
David Reiss1a2f2182008-06-11 01:14:01 +0000266 _ -> error_logger:error_report({?MODULE, ?LINE,
David Reiss80862312008-06-11 00:59:55 +0000267 {child_error, Reason, erlang:get_stacktrace()}})
268 end,
269 State1 = State#thrift_socket_server{max=Max + 1},
270 State2 = case Pid of
David Reiss1a2f2182008-06-11 01:14:01 +0000271 null -> new_acceptor(State1);
272 _ -> State1
273 end,
David Reiss80862312008-06-11 00:59:55 +0000274 {noreply, State2};
275handle_info(Info, State) ->
276 error_logger:info_report([{'INFO', Info}, {'State', State}]),
277 {noreply, State}.