Anthony F. Molinaro | 6e48e98 | 2011-07-12 18:56:15 +0000 | [diff] [blame] | 1 | %% |
| 2 | %% Licensed to the Apache Software Foundation (ASF) under one |
| 3 | %% or more contributor license agreements. See the NOTICE file |
| 4 | %% distributed with this work for additional information |
| 5 | %% regarding copyright ownership. The ASF licenses this file |
| 6 | %% to you under the Apache License, Version 2.0 (the |
| 7 | %% "License"); you may not use this file except in compliance |
| 8 | %% with the License. You may obtain a copy of the License at |
| 9 | %% |
| 10 | %% http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | %% |
| 12 | %% Unless required by applicable law or agreed to in writing, |
| 13 | %% software distributed under the License is distributed on an |
| 14 | %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | %% KIND, either express or implied. See the License for the |
| 16 | %% specific language governing permissions and limitations |
| 17 | %% under the License. |
| 18 | %% |
| 19 | |
| 20 | -module(thrift_reconnecting_client). |
| 21 | |
| 22 | -behaviour(gen_server). |
| 23 | |
| 24 | %% API |
| 25 | -export([ call/3, |
| 26 | get_stats/1, |
| 27 | get_and_reset_stats/1 ]). |
| 28 | |
| 29 | -export([ start_link/6 ]). |
| 30 | |
| 31 | %% gen_server callbacks |
| 32 | -export([ init/1, |
| 33 | handle_call/3, |
| 34 | handle_cast/2, |
| 35 | handle_info/2, |
| 36 | terminate/2, |
| 37 | code_change/3 ]). |
| 38 | |
| 39 | -record( state, { client = nil, |
| 40 | host, |
| 41 | port, |
| 42 | thrift_svc, |
| 43 | thrift_opts, |
| 44 | reconn_min, |
| 45 | reconn_max, |
| 46 | reconn_time, |
| 47 | op_cnt_dict, |
| 48 | op_time_dict } ). |
| 49 | |
| 50 | %%==================================================================== |
| 51 | %% API |
| 52 | %%==================================================================== |
| 53 | %%-------------------------------------------------------------------- |
| 54 | %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} |
| 55 | %% Description: Starts the server |
| 56 | %%-------------------------------------------------------------------- |
| 57 | start_link( Host, Port, |
| 58 | ThriftSvc, ThriftOpts, |
| 59 | ReconnMin, ReconnMax ) -> |
| 60 | gen_server:start_link( ?MODULE, |
| 61 | [ Host, Port, |
| 62 | ThriftSvc, ThriftOpts, |
| 63 | ReconnMin, ReconnMax ], |
| 64 | [] ). |
| 65 | |
| 66 | call( Pid, Op, Args ) -> |
| 67 | gen_server:call( Pid, { call, Op, Args } ). |
| 68 | |
| 69 | get_stats( Pid ) -> |
| 70 | gen_server:call( Pid, get_stats ). |
| 71 | |
| 72 | get_and_reset_stats( Pid ) -> |
| 73 | gen_server:call( Pid, get_and_reset_stats ). |
| 74 | |
| 75 | %%==================================================================== |
| 76 | %% gen_server callbacks |
| 77 | %%==================================================================== |
| 78 | |
| 79 | %%-------------------------------------------------------------------- |
| 80 | %% Function: init(Args) -> {ok, State} | |
| 81 | %% {ok, State, Timeout} | |
| 82 | %% ignore | |
| 83 | %% {stop, Reason} |
| 84 | %% Description: Start the server. |
| 85 | %%-------------------------------------------------------------------- |
| 86 | init( [ Host, Port, TSvc, TOpts, ReconnMin, ReconnMax ] ) -> |
| 87 | process_flag( trap_exit, true ), |
| 88 | |
| 89 | State = #state{ host = Host, |
| 90 | port = Port, |
| 91 | thrift_svc = TSvc, |
| 92 | thrift_opts = TOpts, |
| 93 | reconn_min = ReconnMin, |
| 94 | reconn_max = ReconnMax, |
| 95 | op_cnt_dict = dict:new(), |
| 96 | op_time_dict = dict:new() }, |
| 97 | |
| 98 | { ok, try_connect( State ) }. |
| 99 | |
| 100 | %%-------------------------------------------------------------------- |
| 101 | %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | |
| 102 | %% {reply, Reply, State, Timeout} | |
| 103 | %% {noreply, State} | |
| 104 | %% {noreply, State, Timeout} | |
| 105 | %% {stop, Reason, Reply, State} | |
| 106 | %% {stop, Reason, State} |
| 107 | %% Description: Handling call messages |
| 108 | %%-------------------------------------------------------------------- |
| 109 | handle_call( { call, Op, _ }, |
| 110 | _From, |
| 111 | State = #state{ client = nil } ) -> |
| 112 | { reply, { error, noconn }, incr_stats( Op, "failfast", 1, State ) }; |
| 113 | |
| 114 | handle_call( { call, Op, Args }, |
| 115 | _From, |
| 116 | State=#state{ client = Client } ) -> |
| 117 | |
| 118 | Start = now(), |
| 119 | Result = ( catch thrift_client:call( Client, Op, Args) ), |
| 120 | Time = timer:now_diff( now(), Start ), |
| 121 | |
| 122 | case Result of |
| 123 | { C, { ok, Reply } } -> |
| 124 | S = incr_stats( Op, "success", Time, State#state{ client = C } ), |
| 125 | { reply, {ok, Reply }, S }; |
| 126 | { _, { E, Msg } } when E == error; E == exception -> |
| 127 | S = incr_stats( Op, "error", Time, try_connect( State ) ), |
| 128 | { reply, { E, Msg }, S }; |
| 129 | Other -> |
| 130 | S = incr_stats( Op, "error", Time, try_connect( State ) ), |
| 131 | { reply, Other, S } |
| 132 | end; |
| 133 | |
| 134 | handle_call( get_stats, |
| 135 | _From, |
| 136 | State = #state{} ) -> |
| 137 | { reply, stats( State ), State }; |
| 138 | |
| 139 | handle_call( get_and_reset_stats, |
| 140 | _From, |
| 141 | State = #state{} ) -> |
| 142 | { reply, stats( State ), reset_stats( State ) }. |
| 143 | |
| 144 | %%-------------------------------------------------------------------- |
| 145 | %% Function: handle_cast(Msg, State) -> {noreply, State} | |
| 146 | %% {noreply, State, Timeout} | |
| 147 | %% {stop, Reason, State} |
| 148 | %% Description: Handling cast messages |
| 149 | %%-------------------------------------------------------------------- |
| 150 | handle_cast( _Msg, State ) -> |
| 151 | { noreply, State }. |
| 152 | |
| 153 | %%-------------------------------------------------------------------- |
| 154 | %% Function: handle_info(Info, State) -> {noreply, State} | |
| 155 | %% {noreply, State, Timeout} | |
| 156 | %% {stop, Reason, State} |
| 157 | %% Description: Handling all non call/cast messages |
| 158 | %%-------------------------------------------------------------------- |
| 159 | handle_info( _Info, State ) -> |
| 160 | { noreply, State }. |
| 161 | |
| 162 | %%-------------------------------------------------------------------- |
| 163 | %% Function: terminate(Reason, State) -> void() |
| 164 | %% Description: This function is called by a gen_server when it is about to |
| 165 | %% terminate. It should be the opposite of Module:init/1 and do any necessary |
| 166 | %% cleaning up. When it returns, the gen_server terminates with Reason. |
| 167 | %% The return value is ignored. |
| 168 | %%-------------------------------------------------------------------- |
| 169 | terminate( _Reason, #state{ client = Client } ) -> |
| 170 | thrift_client:close( Client ), |
| 171 | ok. |
| 172 | |
| 173 | %%-------------------------------------------------------------------- |
| 174 | %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} |
| 175 | %% Description: Convert process state when code is changed |
| 176 | %%-------------------------------------------------------------------- |
| 177 | code_change( _OldVsn, State, _Extra ) -> |
| 178 | { ok, State }. |
| 179 | |
| 180 | %%-------------------------------------------------------------------- |
| 181 | %%% Internal functions |
| 182 | %%-------------------------------------------------------------------- |
| 183 | try_connect( State = #state{ client = OldClient, |
| 184 | host = Host, |
| 185 | port = Port, |
| 186 | thrift_svc = TSvc, |
| 187 | thrift_opts = TOpts } ) -> |
| 188 | |
| 189 | case OldClient of |
| 190 | nil -> ok; |
| 191 | _ -> ( catch thrift_client:close( OldClient ) ) |
| 192 | end, |
| 193 | |
| 194 | case catch thrift_client_util:new( Host, Port, TSvc, TOpts ) of |
| 195 | { ok, Client } -> |
| 196 | State#state{ client = Client, reconn_time = 0 }; |
| 197 | { E, Msg } when E == error; E == exception -> |
| 198 | ReconnTime = reconn_time( State ), |
| 199 | error_logger:error_msg( "[~w] ~w connect failed (~w), trying again in ~w ms~n", |
| 200 | [ self(), TSvc, Msg, ReconnTime ] ), |
| 201 | erlang:send_after( ReconnTime, self(), try_connect ), |
| 202 | State#state{ client = nil, reconn_time = ReconnTime } |
| 203 | end. |
| 204 | |
| 205 | |
| 206 | reconn_time( #state{ reconn_min = ReconnMin, reconn_time = 0 } ) -> |
| 207 | ReconnMin; |
| 208 | reconn_time( #state{ reconn_max = ReconnMax, reconn_time = ReconnMax } ) -> |
| 209 | ReconnMax; |
| 210 | reconn_time( #state{ reconn_max = ReconnMax, reconn_time = R } ) -> |
| 211 | Backoff = 2 * R, |
| 212 | case Backoff > ReconnMax of |
| 213 | true -> ReconnMax; |
| 214 | false -> Backoff |
| 215 | end. |
| 216 | |
| 217 | |
| 218 | incr_stats( Op, Result, Time, |
| 219 | State = #state{ op_cnt_dict = OpCntDict, |
| 220 | op_time_dict = OpTimeDict } ) -> |
| 221 | Key = lists:flatten( [ atom_to_list( Op ), [ "_" | Result ] ] ), |
| 222 | State#state{ op_cnt_dict = dict:update_counter( Key, 1, OpCntDict ), |
| 223 | op_time_dict = dict:update_counter( Key, Time, OpTimeDict ) }. |
| 224 | |
| 225 | |
| 226 | stats( #state{ thrift_svc = TSvc, |
| 227 | op_cnt_dict = OpCntDict, |
| 228 | op_time_dict = OpTimeDict } ) -> |
| 229 | Svc = atom_to_list(TSvc), |
| 230 | |
| 231 | F = fun( Key, Count, Stats ) -> |
| 232 | Name = lists:flatten( [ Svc, [ "_" | Key ] ] ), |
| 233 | Micros = dict:fetch( Key, OpTimeDict ), |
| 234 | [ { Name, Count, Micros } | Stats ] |
| 235 | end, |
| 236 | |
| 237 | dict:fold( F, [], OpCntDict ). |
| 238 | |
| 239 | reset_stats( State = #state{} ) -> |
| 240 | State#state{ op_cnt_dict = dict:new(), op_time_dict = dict:new() }. |