blob: c9fe2aa036ad8af8542f40b89161a249cb13bd8f [file] [log] [blame]
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +00001%%
2%% Licensed to the Apache Software Foundation (ASF) under one
3%% or more contributor license agreements. See the NOTICE file
4%% distributed with this work for additional information
5%% regarding copyright ownership. The ASF licenses this file
6%% to you under the Apache License, Version 2.0 (the
7%% "License"); you may not use this file except in compliance
8%% with the License. You may obtain a copy of the License at
9%%
10%% http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing,
13%% software distributed under the License is distributed on an
14%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15%% KIND, either express or implied. See the License for the
16%% specific language governing permissions and limitations
17%% under the License.
18%%
19
20-module(thrift_reconnecting_client).
21
22-behaviour(gen_server).
23
24%% API
Sergei Elin45764092022-09-23 23:21:31 +030025-export([
26 call/3,
27 get_stats/1,
28 get_and_reset_stats/1
29]).
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +000030
Sergei Elin45764092022-09-23 23:21:31 +030031-export([start_link/6]).
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +000032
33%% gen_server callbacks
Sergei Elin45764092022-09-23 23:21:31 +030034-export([
35 init/1,
36 handle_call/3,
37 handle_cast/2,
38 handle_info/2,
39 terminate/2,
40 code_change/3
41]).
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +000042
Sergei Elin45764092022-09-23 23:21:31 +030043-record(state, {
44 client = nil,
45 host,
46 port,
47 thrift_svc,
48 thrift_opts,
49 reconn_min,
50 reconn_max,
51 reconn_time = 0,
52 op_cnt_dict,
53 op_time_dict
54}).
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +000055
56%%====================================================================
57%% API
58%%====================================================================
59%%--------------------------------------------------------------------
60%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
61%% Description: Starts the server
62%%--------------------------------------------------------------------
Sergei Elin45764092022-09-23 23:21:31 +030063start_link(
64 Host,
65 Port,
66 ThriftSvc,
67 ThriftOpts,
68 ReconnMin,
69 ReconnMax
70) ->
71 gen_server:start_link(
72 ?MODULE,
73 [
74 Host,
75 Port,
76 ThriftSvc,
77 ThriftOpts,
78 ReconnMin,
79 ReconnMax
80 ],
81 []
82 ).
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +000083
Sergei Elin45764092022-09-23 23:21:31 +030084call(Pid, Op, Args) ->
85 gen_server:call(Pid, {call, Op, Args}).
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +000086
Sergei Elin45764092022-09-23 23:21:31 +030087get_stats(Pid) ->
88 gen_server:call(Pid, get_stats).
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +000089
Sergei Elin45764092022-09-23 23:21:31 +030090get_and_reset_stats(Pid) ->
91 gen_server:call(Pid, get_and_reset_stats).
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +000092
93%%====================================================================
94%% gen_server callbacks
95%%====================================================================
96
97%%--------------------------------------------------------------------
98%% Function: init(Args) -> {ok, State} |
99%% {ok, State, Timeout} |
100%% ignore |
101%% {stop, Reason}
102%% Description: Start the server.
103%%--------------------------------------------------------------------
Sergei Elin45764092022-09-23 23:21:31 +0300104init([Host, Port, TSvc, TOpts, ReconnMin, ReconnMax]) ->
105 process_flag(trap_exit, true),
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000106
Sergei Elin45764092022-09-23 23:21:31 +0300107 State = #state{
108 host = Host,
109 port = Port,
110 thrift_svc = TSvc,
111 thrift_opts = TOpts,
112 reconn_min = ReconnMin,
113 reconn_max = ReconnMax,
114 op_cnt_dict = dict:new(),
115 op_time_dict = dict:new()
116 },
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000117
Sergei Elin45764092022-09-23 23:21:31 +0300118 {ok, try_connect(State)}.
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000119
120%%--------------------------------------------------------------------
121%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
122%% {reply, Reply, State, Timeout} |
123%% {noreply, State} |
124%% {noreply, State, Timeout} |
125%% {stop, Reason, Reply, State} |
126%% {stop, Reason, State}
127%% Description: Handling call messages
128%%--------------------------------------------------------------------
Sergei Elin45764092022-09-23 23:21:31 +0300129handle_call(
130 {call, Op, _},
131 _From,
132 State = #state{client = nil}
133) ->
134 {reply, {error, noconn}, incr_stats(Op, "failfast", 1, State)};
135handle_call(
136 {call, Op, Args},
137 _From,
138 State = #state{client = Client}
139) ->
140 Timer = timer_fun(),
141 Result = (catch thrift_client:call(Client, Op, Args)),
142 Time = Timer(),
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000143
Sergei Elin45764092022-09-23 23:21:31 +0300144 case Result of
145 {C, {ok, Reply}} ->
146 S = incr_stats(Op, "success", Time, State#state{client = C}),
147 {reply, {ok, Reply}, S};
148 {_, {E, Msg}} when E == error; E == exception ->
149 S = incr_stats(Op, "error", Time, try_connect(State)),
150 {reply, {E, Msg}, S};
151 Other ->
152 S = incr_stats(Op, "error", Time, try_connect(State)),
153 {reply, Other, S}
154 end;
155handle_call(
156 get_stats,
157 _From,
158 State = #state{}
159) ->
160 {reply, stats(State), State};
161handle_call(
162 get_and_reset_stats,
163 _From,
164 State = #state{}
165) ->
166 {reply, stats(State), reset_stats(State)}.
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000167
168%%--------------------------------------------------------------------
169%% Function: handle_cast(Msg, State) -> {noreply, State} |
170%% {noreply, State, Timeout} |
171%% {stop, Reason, State}
172%% Description: Handling cast messages
173%%--------------------------------------------------------------------
Sergei Elin45764092022-09-23 23:21:31 +0300174handle_cast(_Msg, State) ->
175 {noreply, State}.
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000176
177%%--------------------------------------------------------------------
178%% Function: handle_info(Info, State) -> {noreply, State} |
179%% {noreply, State, Timeout} |
180%% {stop, Reason, State}
181%% Description: Handling all non call/cast messages
182%%--------------------------------------------------------------------
Sergei Elin45764092022-09-23 23:21:31 +0300183handle_info(try_connect, State) ->
184 {noreply, try_connect(State)};
185handle_info(_Info, State) ->
186 {noreply, State}.
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000187
188%%--------------------------------------------------------------------
189%% Function: terminate(Reason, State) -> void()
190%% Description: This function is called by a gen_server when it is about to
191%% terminate. It should be the opposite of Module:init/1 and do any necessary
192%% cleaning up. When it returns, the gen_server terminates with Reason.
193%% The return value is ignored.
194%%--------------------------------------------------------------------
Sergei Elin45764092022-09-23 23:21:31 +0300195terminate(_Reason, #state{client = Client}) ->
196 thrift_client:close(Client),
197 ok.
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000198
199%%--------------------------------------------------------------------
200%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
201%% Description: Convert process state when code is changed
202%%--------------------------------------------------------------------
Sergei Elin45764092022-09-23 23:21:31 +0300203code_change(_OldVsn, State, _Extra) ->
204 {ok, State}.
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000205
206%%--------------------------------------------------------------------
207%%% Internal functions
208%%--------------------------------------------------------------------
Sergei Elin45764092022-09-23 23:21:31 +0300209try_connect(
210 State = #state{
211 client = OldClient,
212 host = Host,
213 port = Port,
214 thrift_svc = TSvc,
215 thrift_opts = TOpts
216 }
217) ->
218 case OldClient of
219 nil -> ok;
220 _ -> (catch thrift_client:close(OldClient))
221 end,
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000222
Sergei Elin45764092022-09-23 23:21:31 +0300223 case catch thrift_client_util:new(Host, Port, TSvc, TOpts) of
224 {ok, Client} ->
225 State#state{client = Client, reconn_time = 0};
226 {E, Msg} when E == error; E == exception ->
227 ReconnTime = reconn_time(State),
228 error_logger:error_msg(
229 "[~w] ~w connect failed (~w), trying again in ~w ms~n",
230 [self(), TSvc, Msg, ReconnTime]
231 ),
232 erlang:send_after(ReconnTime, self(), try_connect),
233 State#state{client = nil, reconn_time = ReconnTime}
234 end.
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000235
Sergei Elin45764092022-09-23 23:21:31 +0300236reconn_time(#state{reconn_min = ReconnMin, reconn_time = 0}) ->
237 ReconnMin;
238reconn_time(#state{reconn_max = ReconnMax, reconn_time = ReconnMax}) ->
239 ReconnMax;
240reconn_time(#state{reconn_max = ReconnMax, reconn_time = R}) ->
241 Backoff = 2 * R,
242 case Backoff > ReconnMax of
243 true -> ReconnMax;
244 false -> Backoff
245 end.
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000246
Веселов Андрей8ab38b62016-04-29 13:12:05 +0300247-ifdef(time_correction).
248timer_fun() ->
Sergei Elin45764092022-09-23 23:21:31 +0300249 T1 = erlang:monotonic_time(),
250 fun() ->
251 T2 = erlang:monotonic_time(),
252 erlang:convert_time_unit(T2 - T1, native, micro_seconds)
253 end.
Веселов Андрей8ab38b62016-04-29 13:12:05 +0300254-else.
255timer_fun() ->
Sergei Elin45764092022-09-23 23:21:31 +0300256 T1 = erlang:timestamp(),
257 fun() ->
258 T2 = erlang:timestamp(),
259 timer:now_diff(T2, T1)
260 end.
Веселов Андрей8ab38b62016-04-29 13:12:05 +0300261-endif.
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000262
Sergei Elin45764092022-09-23 23:21:31 +0300263incr_stats(
264 Op,
265 Result,
266 Time,
267 State = #state{
268 op_cnt_dict = OpCntDict,
269 op_time_dict = OpTimeDict
270 }
271) ->
272 Key = lists:flatten([atom_to_list(Op), ["_" | Result]]),
273 State#state{
274 op_cnt_dict = dict:update_counter(Key, 1, OpCntDict),
275 op_time_dict = dict:update_counter(Key, Time, OpTimeDict)
276 }.
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000277
Sergei Elin45764092022-09-23 23:21:31 +0300278stats(#state{
279 thrift_svc = TSvc,
280 op_cnt_dict = OpCntDict,
281 op_time_dict = OpTimeDict
282}) ->
283 Svc = atom_to_list(TSvc),
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000284
Sergei Elin45764092022-09-23 23:21:31 +0300285 F = fun(Key, Count, Stats) ->
286 Name = lists:flatten([Svc, ["_" | Key]]),
287 Micros = dict:fetch(Key, OpTimeDict),
288 [{Name, Count, Micros} | Stats]
289 end,
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000290
Sergei Elin45764092022-09-23 23:21:31 +0300291 dict:fold(F, [], OpCntDict).
Anthony F. Molinaro6e48e982011-07-12 18:56:15 +0000292
Sergei Elin45764092022-09-23 23:21:31 +0300293reset_stats(State = #state{}) ->
294 State#state{op_cnt_dict = dict:new(), op_time_dict = dict:new()}.