blob: 6794e6301b550f848d48211e90d1ccfa1a894fbb [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001%%
2%% Licensed to the Apache Software Foundation (ASF) under one
3%% or more contributor license agreements. See the NOTICE file
4%% distributed with this work for additional information
5%% regarding copyright ownership. The ASF licenses this file
6%% to you under the Apache License, Version 2.0 (the
7%% "License"); you may not use this file except in compliance
8%% with the License. You may obtain a copy of the License at
9%%
10%% http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing,
13%% software distributed under the License is distributed on an
14%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15%% KIND, either express or implied. See the License for the
16%% specific language governing permissions and limitations
17%% under the License.
18%%
David Reiss80862312008-06-11 00:59:55 +000019
David Reissea2cba82009-03-30 21:35:00 +000020-module(thrift_socket_server).
David Reiss80862312008-06-11 00:59:55 +000021
22-behaviour(gen_server).
23
24-export([start/1, stop/1]).
25
26-export([init/1, handle_call/3, handle_cast/2, terminate/2, code_change/3,
David Reiss1a2f2182008-06-11 01:14:01 +000027 handle_info/2]).
David Reiss80862312008-06-11 00:59:55 +000028
29-export([acceptor_loop/1]).
30
31-record(thrift_socket_server,
David Reiss1a2f2182008-06-11 01:14:01 +000032 {port,
David Reiss80862312008-06-11 00:59:55 +000033 service,
34 handler,
David Reiss1a2f2182008-06-11 01:14:01 +000035 name,
36 max=2048,
37 ip=any,
38 listen=null,
39 acceptor=null,
David Reissb42361c2009-09-09 17:18:57 +000040 socket_opts=[{recv_timeout, 500}],
41 framed=false
David Reissb7c88022008-06-11 01:00:20 +000042 }).
David Reiss80862312008-06-11 00:59:55 +000043
44start(State=#thrift_socket_server{}) ->
David Reiss80862312008-06-11 00:59:55 +000045 start_server(State);
46start(Options) ->
47 start(parse_options(Options)).
48
49stop(Name) when is_atom(Name) ->
50 gen_server:cast(Name, stop);
51stop(Pid) when is_pid(Pid) ->
52 gen_server:cast(Pid, stop);
53stop({local, Name}) ->
54 stop(Name);
55stop({global, Name}) ->
56 stop(Name);
57stop(Options) ->
58 State = parse_options(Options),
59 stop(State#thrift_socket_server.name).
60
61%% Internal API
62
63parse_options(Options) ->
64 parse_options(Options, #thrift_socket_server{}).
65
66parse_options([], State) ->
67 State;
68parse_options([{name, L} | Rest], State) when is_list(L) ->
69 Name = {local, list_to_atom(L)},
70 parse_options(Rest, State#thrift_socket_server{name=Name});
71parse_options([{name, A} | Rest], State) when is_atom(A) ->
72 Name = {local, A},
73 parse_options(Rest, State#thrift_socket_server{name=Name});
74parse_options([{name, Name} | Rest], State) ->
75 parse_options(Rest, State#thrift_socket_server{name=Name});
76parse_options([{port, L} | Rest], State) when is_list(L) ->
77 Port = list_to_integer(L),
78 parse_options(Rest, State#thrift_socket_server{port=Port});
79parse_options([{port, Port} | Rest], State) ->
80 parse_options(Rest, State#thrift_socket_server{port=Port});
81parse_options([{ip, Ip} | Rest], State) ->
82 ParsedIp = case Ip of
David Reiss1a2f2182008-06-11 01:14:01 +000083 any ->
84 any;
85 Ip when is_tuple(Ip) ->
86 Ip;
87 Ip when is_list(Ip) ->
88 {ok, IpTuple} = inet_parse:address(Ip),
89 IpTuple
90 end,
David Reiss80862312008-06-11 00:59:55 +000091 parse_options(Rest, State#thrift_socket_server{ip=ParsedIp});
David Reissb7c88022008-06-11 01:00:20 +000092parse_options([{socket_opts, L} | Rest], State) when is_list(L), length(L) > 0 ->
93 parse_options(Rest, State#thrift_socket_server{socket_opts=L});
David Reiss80862312008-06-11 00:59:55 +000094parse_options([{handler, Handler} | Rest], State) ->
95 parse_options(Rest, State#thrift_socket_server{handler=Handler});
96parse_options([{service, Service} | Rest], State) ->
97 parse_options(Rest, State#thrift_socket_server{service=Service});
98parse_options([{max, Max} | Rest], State) ->
99 MaxInt = case Max of
David Reiss1a2f2182008-06-11 01:14:01 +0000100 Max when is_list(Max) ->
101 list_to_integer(Max);
102 Max when is_integer(Max) ->
103 Max
104 end,
David Reissb42361c2009-09-09 17:18:57 +0000105 parse_options(Rest, State#thrift_socket_server{max=MaxInt});
106parse_options([{framed, Framed} | Rest], State) when is_boolean(Framed) ->
107 parse_options(Rest, State#thrift_socket_server{framed=Framed}).
David Reiss80862312008-06-11 00:59:55 +0000108
109start_server(State=#thrift_socket_server{name=Name}) ->
David Reiss80862312008-06-11 00:59:55 +0000110 case Name of
David Reiss1a2f2182008-06-11 01:14:01 +0000111 undefined ->
112 gen_server:start_link(?MODULE, State, []);
113 _ ->
114 gen_server:start_link(Name, ?MODULE, State, [])
David Reiss80862312008-06-11 00:59:55 +0000115 end.
116
117init(State=#thrift_socket_server{ip=Ip, port=Port}) ->
David Reissd74b0232008-06-11 01:02:55 +0000118 process_flag(trap_exit, true),
David Reiss80862312008-06-11 00:59:55 +0000119 BaseOpts = [binary,
David Reiss1a2f2182008-06-11 01:14:01 +0000120 {reuseaddr, true},
121 {packet, 0},
122 {backlog, 4096},
123 {recbuf, 8192},
124 {active, false}],
David Reiss80862312008-06-11 00:59:55 +0000125 Opts = case Ip of
David Reiss1a2f2182008-06-11 01:14:01 +0000126 any ->
David Reiss80862312008-06-11 00:59:55 +0000127 BaseOpts;
David Reiss1a2f2182008-06-11 01:14:01 +0000128 Ip ->
129 [{ip, Ip} | BaseOpts]
130 end,
David Reiss80862312008-06-11 00:59:55 +0000131 case gen_tcp_listen(Port, Opts, State) of
132 {stop, eacces} ->
133 %% fdsrv module allows another shot to bind
134 %% ports which require root access
135 case Port < 1024 of
136 true ->
137 case fdsrv:start() of
138 {ok, _} ->
139 case fdsrv:bind_socket(tcp, Port) of
140 {ok, Fd} ->
141 gen_tcp_listen(Port, [{fd, Fd} | Opts], State);
142 _ ->
143 {stop, fdsrv_bind_failed}
144 end;
145 _ ->
146 {stop, fdsrv_start_failed}
147 end;
148 false ->
149 {stop, eacces}
150 end;
151 Other ->
152 error_logger:info_msg("thrift service listening on port ~p", [Port]),
153 Other
154 end.
155
156gen_tcp_listen(Port, Opts, State) ->
157 case gen_tcp:listen(Port, Opts) of
158 {ok, Listen} ->
David Reiss1a2f2182008-06-11 01:14:01 +0000159 {ok, ListenPort} = inet:port(Listen),
160 {ok, new_acceptor(State#thrift_socket_server{listen=Listen,
David Reiss80862312008-06-11 00:59:55 +0000161 port=ListenPort})};
David Reiss1a2f2182008-06-11 01:14:01 +0000162 {error, Reason} ->
163 {stop, Reason}
David Reiss80862312008-06-11 00:59:55 +0000164 end.
165
166new_acceptor(State=#thrift_socket_server{max=0}) ->
167 error_logger:error_msg("Not accepting new connections"),
168 State#thrift_socket_server{acceptor=null};
David Reissf32d0fb2010-08-30 22:05:00 +0000169new_acceptor(State=#thrift_socket_server{acceptor=OldPid, listen=Listen,
David Reissb7c88022008-06-11 01:00:20 +0000170 service=Service, handler=Handler,
David Reissb42361c2009-09-09 17:18:57 +0000171 socket_opts=Opts, framed=Framed
David Reissb7c88022008-06-11 01:00:20 +0000172 }) ->
David Reiss80862312008-06-11 00:59:55 +0000173 Pid = proc_lib:spawn_link(?MODULE, acceptor_loop,
David Reissb42361c2009-09-09 17:18:57 +0000174 [{self(), Listen, Service, Handler, Opts, Framed}]),
David Reissf32d0fb2010-08-30 22:05:00 +0000175%% error_logger:info_msg("Spawning new acceptor: ~p => ~p", [OldPid, Pid]),
David Reiss80862312008-06-11 00:59:55 +0000176 State#thrift_socket_server{acceptor=Pid}.
177
David Reissb42361c2009-09-09 17:18:57 +0000178acceptor_loop({Server, Listen, Service, Handler, SocketOpts, Framed})
David Reissb7c88022008-06-11 01:00:20 +0000179 when is_pid(Server), is_list(SocketOpts) ->
David Reissd74b0232008-06-11 01:02:55 +0000180 case catch gen_tcp:accept(Listen) of % infinite timeout
David Reiss1a2f2182008-06-11 01:14:01 +0000181 {ok, Socket} ->
182 gen_server:cast(Server, {accepted, self()}),
David Reiss80862312008-06-11 00:59:55 +0000183 ProtoGen = fun() ->
David Reissb7c88022008-06-11 01:00:20 +0000184 {ok, SocketTransport} = thrift_socket_transport:new(Socket, SocketOpts),
David Reissb42361c2009-09-09 17:18:57 +0000185 {ok, Transport} =
186 case Framed of
187 true -> thrift_framed_transport:new(SocketTransport);
188 false -> thrift_buffered_transport:new(SocketTransport)
189 end,
190 {ok, Protocol} = thrift_binary_protocol:new(Transport),
David Reissf32d0fb2010-08-30 22:05:00 +0000191 {ok, IProt=Protocol, OProt=Protocol}
David Reiss80862312008-06-11 00:59:55 +0000192 end,
193 thrift_processor:init({Server, ProtoGen, Service, Handler});
David Reiss1a2f2182008-06-11 01:14:01 +0000194 {error, closed} ->
195 exit({error, closed});
196 Other ->
197 error_logger:error_report(
198 [{application, thrift},
199 "Accept failed error",
200 lists:flatten(io_lib:format("~p", [Other]))]),
201 exit({error, accept_failed})
David Reiss80862312008-06-11 00:59:55 +0000202 end.
203
204handle_call({get, port}, _From, State=#thrift_socket_server{port=Port}) ->
205 {reply, Port, State};
206handle_call(_Message, _From, State) ->
207 Res = error,
208 {reply, Res, State}.
209
210handle_cast({accepted, Pid},
David Reiss1a2f2182008-06-11 01:14:01 +0000211 State=#thrift_socket_server{acceptor=Pid, max=Max}) ->
David Reiss80862312008-06-11 00:59:55 +0000212 % io:format("accepted ~p~n", [Pid]),
213 State1 = State#thrift_socket_server{max=Max - 1},
214 {noreply, new_acceptor(State1)};
215handle_cast(stop, State) ->
216 {stop, normal, State}.
217
218terminate(_Reason, #thrift_socket_server{listen=Listen, port=Port}) ->
219 gen_tcp:close(Listen),
220 case Port < 1024 of
221 true ->
222 catch fdsrv:stop(),
223 ok;
224 false ->
225 ok
226 end.
227
228code_change(_OldVsn, State, _Extra) ->
229 State.
230
231handle_info({'EXIT', Pid, normal},
David Reiss1a2f2182008-06-11 01:14:01 +0000232 State=#thrift_socket_server{acceptor=Pid}) ->
David Reiss80862312008-06-11 00:59:55 +0000233 {noreply, new_acceptor(State)};
234handle_info({'EXIT', Pid, Reason},
David Reiss1a2f2182008-06-11 01:14:01 +0000235 State=#thrift_socket_server{acceptor=Pid}) ->
David Reiss80862312008-06-11 00:59:55 +0000236 error_logger:error_report({?MODULE, ?LINE,
David Reiss1a2f2182008-06-11 01:14:01 +0000237 {acceptor_error, Reason}}),
David Reiss80862312008-06-11 00:59:55 +0000238 timer:sleep(100),
239 {noreply, new_acceptor(State)};
240handle_info({'EXIT', _LoopPid, Reason},
David Reiss1a2f2182008-06-11 01:14:01 +0000241 State=#thrift_socket_server{acceptor=Pid, max=Max}) ->
David Reiss80862312008-06-11 00:59:55 +0000242 case Reason of
David Reiss1a2f2182008-06-11 01:14:01 +0000243 normal -> ok;
David Reiss4cf5a6a2008-06-11 01:00:59 +0000244 shutdown -> ok;
David Reiss1a2f2182008-06-11 01:14:01 +0000245 _ -> error_logger:error_report({?MODULE, ?LINE,
David Reiss80862312008-06-11 00:59:55 +0000246 {child_error, Reason, erlang:get_stacktrace()}})
247 end,
248 State1 = State#thrift_socket_server{max=Max + 1},
249 State2 = case Pid of
David Reiss1a2f2182008-06-11 01:14:01 +0000250 null -> new_acceptor(State1);
251 _ -> State1
252 end,
David Reiss80862312008-06-11 00:59:55 +0000253 {noreply, State2};
254handle_info(Info, State) ->
255 error_logger:info_report([{'INFO', Info}, {'State', State}]),
256 {noreply, State}.