| %% |
| %% Licensed to the Apache Software Foundation (ASF) under one |
| %% or more contributor license agreements. See the NOTICE file |
| %% distributed with this work for additional information |
| %% regarding copyright ownership. The ASF licenses this file |
| %% to you under the Apache License, Version 2.0 (the |
| %% "License"); you may not use this file except in compliance |
| %% with the License. You may obtain a copy of the License at |
| %% |
| %% http://www.apache.org/licenses/LICENSE-2.0 |
| %% |
| %% Unless required by applicable law or agreed to in writing, |
| %% software distributed under the License is distributed on an |
| %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| %% KIND, either express or implied. See the License for the |
| %% specific language governing permissions and limitations |
| %% under the License. |
| %% |
| |
| -module(thrift_reconnecting_client). |
| |
| -behaviour(gen_server). |
| |
| %% API |
| -export([ |
| call/3, |
| get_stats/1, |
| get_and_reset_stats/1 |
| ]). |
| |
| -export([start_link/6]). |
| |
| %% gen_server callbacks |
| -export([ |
| init/1, |
| handle_call/3, |
| handle_cast/2, |
| handle_info/2, |
| terminate/2, |
| code_change/3 |
| ]). |
| |
| -record(state, { |
| client = nil, |
| host, |
| port, |
| thrift_svc, |
| thrift_opts, |
| reconn_min, |
| reconn_max, |
| reconn_time = 0, |
| op_cnt_dict, |
| op_time_dict |
| }). |
| |
| %%==================================================================== |
| %% API |
| %%==================================================================== |
| %%-------------------------------------------------------------------- |
| %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} |
| %% Description: Starts the server |
| %%-------------------------------------------------------------------- |
| start_link( |
| Host, |
| Port, |
| ThriftSvc, |
| ThriftOpts, |
| ReconnMin, |
| ReconnMax |
| ) -> |
| gen_server:start_link( |
| ?MODULE, |
| [ |
| Host, |
| Port, |
| ThriftSvc, |
| ThriftOpts, |
| ReconnMin, |
| ReconnMax |
| ], |
| [] |
| ). |
| |
| call(Pid, Op, Args) -> |
| gen_server:call(Pid, {call, Op, Args}). |
| |
| get_stats(Pid) -> |
| gen_server:call(Pid, get_stats). |
| |
| get_and_reset_stats(Pid) -> |
| gen_server:call(Pid, get_and_reset_stats). |
| |
| %%==================================================================== |
| %% gen_server callbacks |
| %%==================================================================== |
| |
| %%-------------------------------------------------------------------- |
| %% Function: init(Args) -> {ok, State} | |
| %% {ok, State, Timeout} | |
| %% ignore | |
| %% {stop, Reason} |
| %% Description: Start the server. |
| %%-------------------------------------------------------------------- |
| init([Host, Port, TSvc, TOpts, ReconnMin, ReconnMax]) -> |
| process_flag(trap_exit, true), |
| |
| State = #state{ |
| host = Host, |
| port = Port, |
| thrift_svc = TSvc, |
| thrift_opts = TOpts, |
| reconn_min = ReconnMin, |
| reconn_max = ReconnMax, |
| op_cnt_dict = dict:new(), |
| op_time_dict = dict:new() |
| }, |
| |
| {ok, try_connect(State)}. |
| |
| %%-------------------------------------------------------------------- |
| %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | |
| %% {reply, Reply, State, Timeout} | |
| %% {noreply, State} | |
| %% {noreply, State, Timeout} | |
| %% {stop, Reason, Reply, State} | |
| %% {stop, Reason, State} |
| %% Description: Handling call messages |
| %%-------------------------------------------------------------------- |
| handle_call( |
| {call, Op, _}, |
| _From, |
| State = #state{client = nil} |
| ) -> |
| {reply, {error, noconn}, incr_stats(Op, "failfast", 1, State)}; |
| handle_call( |
| {call, Op, Args}, |
| _From, |
| State = #state{client = Client} |
| ) -> |
| Timer = timer_fun(), |
| Result = (catch thrift_client:call(Client, Op, Args)), |
| Time = Timer(), |
| |
| case Result of |
| {C, {ok, Reply}} -> |
| S = incr_stats(Op, "success", Time, State#state{client = C}), |
| {reply, {ok, Reply}, S}; |
| {_, {E, Msg}} when E == error; E == exception -> |
| S = incr_stats(Op, "error", Time, try_connect(State)), |
| {reply, {E, Msg}, S}; |
| Other -> |
| S = incr_stats(Op, "error", Time, try_connect(State)), |
| {reply, Other, S} |
| end; |
| handle_call( |
| get_stats, |
| _From, |
| State = #state{} |
| ) -> |
| {reply, stats(State), State}; |
| handle_call( |
| get_and_reset_stats, |
| _From, |
| State = #state{} |
| ) -> |
| {reply, stats(State), reset_stats(State)}. |
| |
| %%-------------------------------------------------------------------- |
| %% Function: handle_cast(Msg, State) -> {noreply, State} | |
| %% {noreply, State, Timeout} | |
| %% {stop, Reason, State} |
| %% Description: Handling cast messages |
| %%-------------------------------------------------------------------- |
| handle_cast(_Msg, State) -> |
| {noreply, State}. |
| |
| %%-------------------------------------------------------------------- |
| %% Function: handle_info(Info, State) -> {noreply, State} | |
| %% {noreply, State, Timeout} | |
| %% {stop, Reason, State} |
| %% Description: Handling all non call/cast messages |
| %%-------------------------------------------------------------------- |
| handle_info(try_connect, State) -> |
| {noreply, try_connect(State)}; |
| handle_info(_Info, State) -> |
| {noreply, State}. |
| |
| %%-------------------------------------------------------------------- |
| %% Function: terminate(Reason, State) -> void() |
| %% Description: This function is called by a gen_server when it is about to |
| %% terminate. It should be the opposite of Module:init/1 and do any necessary |
| %% cleaning up. When it returns, the gen_server terminates with Reason. |
| %% The return value is ignored. |
| %%-------------------------------------------------------------------- |
| terminate(_Reason, #state{client = Client}) -> |
| thrift_client:close(Client), |
| ok. |
| |
| %%-------------------------------------------------------------------- |
| %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} |
| %% Description: Convert process state when code is changed |
| %%-------------------------------------------------------------------- |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| %%-------------------------------------------------------------------- |
| %%% Internal functions |
| %%-------------------------------------------------------------------- |
| try_connect( |
| State = #state{ |
| client = OldClient, |
| host = Host, |
| port = Port, |
| thrift_svc = TSvc, |
| thrift_opts = TOpts |
| } |
| ) -> |
| case OldClient of |
| nil -> ok; |
| _ -> (catch thrift_client:close(OldClient)) |
| end, |
| |
| case catch thrift_client_util:new(Host, Port, TSvc, TOpts) of |
| {ok, Client} -> |
| State#state{client = Client, reconn_time = 0}; |
| {E, Msg} when E == error; E == exception -> |
| ReconnTime = reconn_time(State), |
| error_logger:error_msg( |
| "[~w] ~w connect failed (~w), trying again in ~w ms~n", |
| [self(), TSvc, Msg, ReconnTime] |
| ), |
| erlang:send_after(ReconnTime, self(), try_connect), |
| State#state{client = nil, reconn_time = ReconnTime} |
| end. |
| |
| reconn_time(#state{reconn_min = ReconnMin, reconn_time = 0}) -> |
| ReconnMin; |
| reconn_time(#state{reconn_max = ReconnMax, reconn_time = ReconnMax}) -> |
| ReconnMax; |
| reconn_time(#state{reconn_max = ReconnMax, reconn_time = R}) -> |
| Backoff = 2 * R, |
| case Backoff > ReconnMax of |
| true -> ReconnMax; |
| false -> Backoff |
| end. |
| |
| -ifdef(time_correction). |
| timer_fun() -> |
| T1 = erlang:monotonic_time(), |
| fun() -> |
| T2 = erlang:monotonic_time(), |
| erlang:convert_time_unit(T2 - T1, native, micro_seconds) |
| end. |
| -else. |
| timer_fun() -> |
| T1 = erlang:timestamp(), |
| fun() -> |
| T2 = erlang:timestamp(), |
| timer:now_diff(T2, T1) |
| end. |
| -endif. |
| |
| incr_stats( |
| Op, |
| Result, |
| Time, |
| State = #state{ |
| op_cnt_dict = OpCntDict, |
| op_time_dict = OpTimeDict |
| } |
| ) -> |
| Key = lists:flatten([atom_to_list(Op), ["_" | Result]]), |
| State#state{ |
| op_cnt_dict = dict:update_counter(Key, 1, OpCntDict), |
| op_time_dict = dict:update_counter(Key, Time, OpTimeDict) |
| }. |
| |
| stats(#state{ |
| thrift_svc = TSvc, |
| op_cnt_dict = OpCntDict, |
| op_time_dict = OpTimeDict |
| }) -> |
| Svc = atom_to_list(TSvc), |
| |
| F = fun(Key, Count, Stats) -> |
| Name = lists:flatten([Svc, ["_" | Key]]), |
| Micros = dict:fetch(Key, OpTimeDict), |
| [{Name, Count, Micros} | Stats] |
| end, |
| |
| dict:fold(F, [], OpCntDict). |
| |
| reset_stats(State = #state{}) -> |
| State#state{op_cnt_dict = dict:new(), op_time_dict = dict:new()}. |