blob: abeee8198a9e8716ec61c115f98b923b411c8167 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001%%
2%% Licensed to the Apache Software Foundation (ASF) under one
3%% or more contributor license agreements. See the NOTICE file
4%% distributed with this work for additional information
5%% regarding copyright ownership. The ASF licenses this file
6%% to you under the Apache License, Version 2.0 (the
7%% "License"); you may not use this file except in compliance
8%% with the License. You may obtain a copy of the License at
9%%
10%% http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing,
13%% software distributed under the License is distributed on an
14%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15%% KIND, either express or implied. See the License for the
16%% specific language governing permissions and limitations
17%% under the License.
18%%
David Reiss80862312008-06-11 00:59:55 +000019
David Reissea2cba82009-03-30 21:35:00 +000020-module(thrift_socket_server).
David Reiss80862312008-06-11 00:59:55 +000021
22-behaviour(gen_server).
23
Sergei Elin45764092022-09-23 23:21:31 +030024-include("thrift_constants.hrl").
25
26-export([start/1, stop/1]).
27-export([
28 init/1,
29 handle_call/3,
30 handle_cast/2,
31 terminate/2,
32 code_change/3,
33 handle_info/2
34]).
35-export([acceptor_loop/1]).
David Reiss80862312008-06-11 00:59:55 +000036
David Robakowskiae971ce2013-08-02 12:16:00 +020037-ifdef(TEST).
Sergei Elin45764092022-09-23 23:21:31 +030038-export([parse_options/1]).
David Robakowskiae971ce2013-08-02 12:16:00 +020039-endif.
David Reiss80862312008-06-11 00:59:55 +000040
Sergei Elin45764092022-09-23 23:21:31 +030041-type protocol() ::
42 compact
43 | {compact, term()}
44 | json
45 | {json, term()}
46 | binary
47 | {binary | term()}
48 | {custom, module(), term()}.
David Reiss80862312008-06-11 00:59:55 +000049
Sergei Elin45764092022-09-23 23:21:31 +030050-type socket_opts() :: [inet:inet_backend() | gen_tcp:listen_option()].
51-type ssl_opts() :: [ssl:tls_server_option()].
52
53-record(thrift_socket_server, {
54 port :: inet:port_number(),
55 service :: thrift_multiplexed_map_wrapper:service_handler_map(),
56 handler :: thrift_multiplexed_map_wrapper:service_handler_map(),
57 acceptors_left :: non_neg_integer(),
58 listen :: gen_tcp:socket(),
59 acceptor :: undefined | pid(),
60 socket_opts :: socket_opts(),
61 protocol :: protocol(),
62 framed :: boolean(),
63 ssltransport :: boolean(),
64 ssloptions :: ssl_opts()
65}).
66
David Reiss80862312008-06-11 00:59:55 +000067start(Options) ->
Sergei Elin45764092022-09-23 23:21:31 +030068 start_server(parse_options(Options)).
David Reiss80862312008-06-11 00:59:55 +000069
70stop(Name) when is_atom(Name) ->
71 gen_server:cast(Name, stop);
72stop(Pid) when is_pid(Pid) ->
73 gen_server:cast(Pid, stop);
74stop({local, Name}) ->
75 stop(Name);
76stop({global, Name}) ->
77 stop(Name);
78stop(Options) ->
Sergei Elin45764092022-09-23 23:21:31 +030079 #{name := Name} = parse_options(Options),
80 stop(Name).
David Reiss80862312008-06-11 00:59:55 +000081
82%% Internal API
83
84parse_options(Options) ->
Sergei Elin45764092022-09-23 23:21:31 +030085 parse_options(Options, #{}).
David Reiss80862312008-06-11 00:59:55 +000086
87parse_options([], State) ->
88 State;
89parse_options([{name, L} | Rest], State) when is_list(L) ->
90 Name = {local, list_to_atom(L)},
Sergei Elin45764092022-09-23 23:21:31 +030091 parse_options(Rest, State#{name => Name});
David Reiss80862312008-06-11 00:59:55 +000092parse_options([{name, A} | Rest], State) when is_atom(A) ->
93 Name = {local, A},
Sergei Elin45764092022-09-23 23:21:31 +030094 parse_options(Rest, State#{name => Name});
David Reiss80862312008-06-11 00:59:55 +000095parse_options([{name, Name} | Rest], State) ->
Sergei Elin45764092022-09-23 23:21:31 +030096 parse_options(Rest, State#{name => Name});
David Reiss80862312008-06-11 00:59:55 +000097parse_options([{port, L} | Rest], State) when is_list(L) ->
98 Port = list_to_integer(L),
Sergei Elin45764092022-09-23 23:21:31 +030099 parse_options(Rest, State#{port => Port});
100parse_options([{port, Port} | Rest], State) when is_integer(Port), Port >= 0, Port =< 65535 ->
101 parse_options(Rest, State#{port => Port});
David Reiss80862312008-06-11 00:59:55 +0000102parse_options([{ip, Ip} | Rest], State) ->
Sergei Elin45764092022-09-23 23:21:31 +0300103 case Ip of
104 any ->
105 parse_options(Rest, State);
106 Ip when is_tuple(Ip) ->
107 parse_options(Rest, State#{ip => Ip});
108 Ip when is_list(Ip) ->
109 {ok, IpTuple} = inet_parse:address(Ip),
110 parse_options(Rest, State#{ip => IpTuple})
111 end;
David Reissb7c88022008-06-11 01:00:20 +0000112parse_options([{socket_opts, L} | Rest], State) when is_list(L), length(L) > 0 ->
Sergei Elin45764092022-09-23 23:21:31 +0300113 parse_options(Rest, State#{socket_opts => L});
David Robakowskiae971ce2013-08-02 12:16:00 +0200114parse_options([{handler, []} | _Rest], _State) ->
115 throw("At least an error handler must be defined.");
Sergei Elin45764092022-09-23 23:21:31 +0300116parse_options([{handler, ServiceHandlerPropertyList} | Rest], State) when
117 is_list(ServiceHandlerPropertyList)
118->
David Robakowskiae971ce2013-08-02 12:16:00 +0200119 ServiceHandlerMap =
Sergei Elin45764092022-09-23 23:21:31 +0300120 case maps:get(handler, State, undefined) of
121 undefined ->
122 lists:foldl(
123 fun
124 ({ServiceName, ServiceHandler}, Acc) when
125 is_list(ServiceName), is_atom(ServiceHandler)
126 ->
127 thrift_multiplexed_map_wrapper:store(ServiceName, ServiceHandler, Acc);
128 (_, _Acc) ->
129 throw(
130 "The handler option is not properly configured for multiplexed services. It should be a kind of [{\"error_handler\", Module::atom()}, {SericeName::list(), Module::atom()}, ...]"
131 )
132 end,
133 thrift_multiplexed_map_wrapper:new(),
134 ServiceHandlerPropertyList
135 );
136 _ ->
137 throw("Error while parsing the handler option.")
138 end,
David Robakowskiae971ce2013-08-02 12:16:00 +0200139 case thrift_multiplexed_map_wrapper:find(?MULTIPLEXED_ERROR_HANDLER_KEY, ServiceHandlerMap) of
Sergei Elin45764092022-09-23 23:21:31 +0300140 {ok, _ErrorHandler} ->
141 parse_options(Rest, State#{handler => ServiceHandlerMap});
142 error ->
143 throw(
144 "The handler option is not properly configured for multiplexed services. It should be a kind of [{\"error_handler\", Module::atom()}, {SericeName::list(), Module::atom()}, ...]"
145 )
David Robakowskiae971ce2013-08-02 12:16:00 +0200146 end;
Sergei Elin45764092022-09-23 23:21:31 +0300147parse_options([{handler, Handler} | Rest], State) when
148 not is_map_key(handler, State), is_atom(Handler)
149->
150 parse_options(Rest, State#{handler => Handler});
David Robakowskiae971ce2013-08-02 12:16:00 +0200151parse_options([{service, []} | _Rest], _State) ->
152 throw("At least one service module must be defined.");
Sergei Elin45764092022-09-23 23:21:31 +0300153parse_options([{service, ServiceModulePropertyList} | Rest], State) when
154 is_list(ServiceModulePropertyList)
155->
David Robakowskiae971ce2013-08-02 12:16:00 +0200156 ServiceModuleMap =
Sergei Elin45764092022-09-23 23:21:31 +0300157 case maps:get(service, State, undefined) of
158 undefined ->
159 lists:foldl(
160 fun
161 ({ServiceName, ServiceModule}, Acc) when
162 is_list(ServiceName), is_atom(ServiceModule)
163 ->
164 thrift_multiplexed_map_wrapper:store(ServiceName, ServiceModule, Acc);
165 (_, _Acc) ->
166 throw(
167 "The service option is not properly configured for multiplexed services. It should be a kind of [{SericeName::list(), ServiceModule::atom()}, ...]"
168 )
169 end,
170 thrift_multiplexed_map_wrapper:new(),
171 ServiceModulePropertyList
172 );
173 _ ->
174 throw("Error while parsing the service option.")
175 end,
176 parse_options(Rest, State#{service => ServiceModuleMap});
177parse_options([{service, Service} | Rest], State) when
178 not is_map_key(service, State), is_atom(Service)
179->
180 parse_options(Rest, State#{service => Service});
181parse_options([{max, Max} | Rest], State) when is_integer(Max), Max > 0 ->
182 parse_options(Rest, State#{max => Max});
Nobuaki Sukegawab31f0902015-11-01 17:00:34 +0900183parse_options([{protocol, Proto} | Rest], State) when is_atom(Proto) ->
Sergei Elin45764092022-09-23 23:21:31 +0300184 parse_options(Rest, State#{protocol => Proto});
David Reissb42361c2009-09-09 17:18:57 +0000185parse_options([{framed, Framed} | Rest], State) when is_boolean(Framed) ->
Sergei Elin45764092022-09-23 23:21:31 +0300186 parse_options(Rest, State#{framed => Framed});
David Robakowskia7d6a972013-08-07 05:51:00 +0200187parse_options([{ssltransport, SSLTransport} | Rest], State) when is_boolean(SSLTransport) ->
Sergei Elin45764092022-09-23 23:21:31 +0300188 parse_options(Rest, State#{ssltransport => SSLTransport});
David Robakowskia7d6a972013-08-07 05:51:00 +0200189parse_options([{ssloptions, SSLOptions} | Rest], State) when is_list(SSLOptions) ->
Sergei Elin45764092022-09-23 23:21:31 +0300190 parse_options(Rest, State#{ssloptions => SSLOptions}).
David Reiss80862312008-06-11 00:59:55 +0000191
Sergei Elin45764092022-09-23 23:21:31 +0300192start_server(Options = #{name := Name}) ->
David Reiss80862312008-06-11 00:59:55 +0000193 case Name of
David Reiss1a2f2182008-06-11 01:14:01 +0000194 undefined ->
Sergei Elin45764092022-09-23 23:21:31 +0300195 gen_server:start_link(?MODULE, Options, []);
David Reiss1a2f2182008-06-11 01:14:01 +0000196 _ ->
Sergei Elin45764092022-09-23 23:21:31 +0300197 gen_server:start_link(Name, ?MODULE, Options, [])
David Reiss80862312008-06-11 00:59:55 +0000198 end.
199
Sergei Elin45764092022-09-23 23:21:31 +0300200init(State = #{port := Port}) ->
David Reissd74b0232008-06-11 01:02:55 +0000201 process_flag(trap_exit, true),
Sergei Elin45764092022-09-23 23:21:31 +0300202 BaseOpts = [
203 binary,
204 {reuseaddr, true},
205 {packet, 0},
206 {backlog, 4096},
207 {recbuf, 8192},
208 {active, false}
209 ],
210 Opts =
211 case maps:get(ip, State, undefined) of
212 undefined ->
213 BaseOpts;
214 Ip ->
215 [{ip, Ip} | BaseOpts]
216 end,
David Reiss80862312008-06-11 00:59:55 +0000217 case gen_tcp:listen(Port, Opts) of
218 {ok, Listen} ->
David Reiss1a2f2182008-06-11 01:14:01 +0000219 {ok, ListenPort} = inet:port(Listen),
Sergei Elin45764092022-09-23 23:21:31 +0300220 {ok,
221 new_acceptor(#thrift_socket_server{
222 acceptors_left = maps:get(max, State, 2048),
223 listen = Listen,
224 port = ListenPort,
225 service = maps:get(service, State),
226 handler = maps:get(handler, State),
227 socket_opts = maps:get(socket_opts, State, [{recv_timeout, 500}]),
228 framed = maps:get(framed, State, false),
229 protocol = maps:get(protocol, State, binary),
230 ssltransport = maps:get(ssltransport, State, false),
231 ssloptions = maps:get(ssloptions, State, [])
232 })};
David Reiss1a2f2182008-06-11 01:14:01 +0000233 {error, Reason} ->
234 {stop, Reason}
David Reiss80862312008-06-11 00:59:55 +0000235 end.
236
Sergei Elin45764092022-09-23 23:21:31 +0300237new_acceptor(State = #thrift_socket_server{acceptors_left = 0}) ->
David Reiss80862312008-06-11 00:59:55 +0000238 error_logger:error_msg("Not accepting new connections"),
Sergei Elin45764092022-09-23 23:21:31 +0300239 State#thrift_socket_server{acceptor = undefined};
240new_acceptor(
241 State = #thrift_socket_server{
242 listen = Listen,
243 service = Service,
244 handler = Handler,
245 socket_opts = Opts,
246 framed = Framed,
247 protocol = Proto,
248 ssltransport = SslTransport,
249 ssloptions = SslOptions
250 }
251) ->
252 Pid = proc_lib:spawn_link(
253 ?MODULE,
254 acceptor_loop,
255 [{self(), Listen, Service, Handler, Opts, Framed, SslTransport, SslOptions, Proto}]
256 ),
257 State#thrift_socket_server{acceptor = Pid}.
David Reiss80862312008-06-11 00:59:55 +0000258
Sergei Elin45764092022-09-23 23:21:31 +0300259acceptor_loop(
260 {Server, Listen, Service, Handler, SocketOpts, Framed, SslTransport, SslOptions, Proto}
261) when
262 is_pid(Server), is_list(SocketOpts)
263->
264 % infinite timeout
265 case catch gen_tcp:accept(Listen) of
David Reiss1a2f2182008-06-11 01:14:01 +0000266 {ok, Socket} ->
267 gen_server:cast(Server, {accepted, self()}),
David Reiss80862312008-06-11 00:59:55 +0000268 ProtoGen = fun() ->
Sergei Elin45764092022-09-23 23:21:31 +0300269 {ok, SocketTransport} =
270 case SslTransport of
271 true -> thrift_sslsocket_transport:new(Socket, SocketOpts, SslOptions);
272 false -> thrift_socket_transport:new(Socket, SocketOpts)
273 end,
274 {ok, Transport} =
275 case Framed of
276 true -> thrift_framed_transport:new(SocketTransport);
277 false -> thrift_buffered_transport:new(SocketTransport)
278 end,
279 {ok, Protocol} =
280 case Proto of
281 compact ->
282 thrift_compact_protocol:new(Transport);
283 {compact, Options} ->
284 thrift_compact_protocol:new(Transport, Options);
285 json ->
286 thrift_json_protocol:new(Transport);
287 {json, Options} ->
288 thrift_json_protocol:new(Transport, Options);
289 binary ->
290 thrift_binary_protocol:new(Transport);
291 {binary, Options} ->
292 thrift_binary_protocol:new(Transport, Options);
293 {custom, Module, Options} ->
294 case erlang:function_exported(Module, new, 2) of
295 true -> Module:new(Transport, Options);
296 false -> throw("Could not instantiate custom protocol")
297 end
298 end,
299 {ok, Protocol}
300 end,
David Reiss80862312008-06-11 00:59:55 +0000301 thrift_processor:init({Server, ProtoGen, Service, Handler});
David Reiss1a2f2182008-06-11 01:14:01 +0000302 {error, closed} ->
303 exit({error, closed});
304 Other ->
305 error_logger:error_report(
Sergei Elin45764092022-09-23 23:21:31 +0300306 [
307 {application, thrift},
308 "Accept failed error",
309 lists:flatten(io_lib:format("~p", [Other]))
310 ]
311 ),
David Reiss1a2f2182008-06-11 01:14:01 +0000312 exit({error, accept_failed})
David Reiss80862312008-06-11 00:59:55 +0000313 end.
314
Sergei Elin45764092022-09-23 23:21:31 +0300315handle_call({get, port}, _From, State = #thrift_socket_server{port = Port}) ->
David Reiss80862312008-06-11 00:59:55 +0000316 {reply, Port, State};
317handle_call(_Message, _From, State) ->
318 Res = error,
319 {reply, Res, State}.
320
Sergei Elin45764092022-09-23 23:21:31 +0300321handle_cast(
322 {accepted, Pid},
323 State = #thrift_socket_server{acceptor = Pid, acceptors_left = Max}
324) ->
David Reiss80862312008-06-11 00:59:55 +0000325 % io:format("accepted ~p~n", [Pid]),
Sergei Elin45764092022-09-23 23:21:31 +0300326 State1 = State#thrift_socket_server{acceptors_left = Max - 1},
David Reiss80862312008-06-11 00:59:55 +0000327 {noreply, new_acceptor(State1)};
328handle_cast(stop, State) ->
329 {stop, normal, State}.
330
Sergei Elin45764092022-09-23 23:21:31 +0300331terminate(Reason, #thrift_socket_server{listen = Listen}) ->
David Reiss80862312008-06-11 00:59:55 +0000332 gen_tcp:close(Listen),
Sergey Elince32ed72019-05-05 20:14:43 +0300333 case Reason of
Sergei Elin45764092022-09-23 23:21:31 +0300334 normal ->
David Reiss80862312008-06-11 00:59:55 +0000335 ok;
Sergei Elin45764092022-09-23 23:21:31 +0300336 shutdown ->
337 ok;
338 _ ->
339 {backtrace, Bt} = erlang:process_info(self(), backtrace),
340 error_logger:error_report({?MODULE, ?LINE, {child_error, Reason, Bt}})
David Reiss80862312008-06-11 00:59:55 +0000341 end.
342
343code_change(_OldVsn, State, _Extra) ->
344 State.
345
Sergei Elin45764092022-09-23 23:21:31 +0300346handle_info(
347 {'EXIT', Pid, normal},
348 State = #thrift_socket_server{acceptor = Pid}
349) ->
David Reiss80862312008-06-11 00:59:55 +0000350 {noreply, new_acceptor(State)};
Sergei Elin45764092022-09-23 23:21:31 +0300351handle_info(
352 {'EXIT', Pid, Reason},
353 State = #thrift_socket_server{acceptor = Pid}
354) ->
355 error_logger:error_report({?MODULE, ?LINE, {acceptor_error, Reason}}),
David Reiss80862312008-06-11 00:59:55 +0000356 timer:sleep(100),
357 {noreply, new_acceptor(State)};
Sergei Elin45764092022-09-23 23:21:31 +0300358handle_info(
359 {'EXIT', _LoopPid, Reason},
360 State = #thrift_socket_server{acceptor = Pid, acceptors_left = AcceptorsLeft}
361) ->
David Reiss80862312008-06-11 00:59:55 +0000362 case Reason of
David Reiss1a2f2182008-06-11 01:14:01 +0000363 normal -> ok;
David Reiss4cf5a6a2008-06-11 01:00:59 +0000364 shutdown -> ok;
Sergei Elin45764092022-09-23 23:21:31 +0300365 _ -> error_logger:error_report({?MODULE, ?LINE, {child_error, Reason}})
David Reiss80862312008-06-11 00:59:55 +0000366 end,
Sergei Elin45764092022-09-23 23:21:31 +0300367 State1 = State#thrift_socket_server{acceptors_left = AcceptorsLeft + 1},
368 State2 =
369 case Pid of
370 undefined -> new_acceptor(State1);
371 _ -> State1
372 end,
David Reiss80862312008-06-11 00:59:55 +0000373 {noreply, State2};
374handle_info(Info, State) ->
375 error_logger:info_report([{'INFO', Info}, {'State', State}]),
376 {noreply, State}.