THRIFT-5635 Update erlang client for Erlang 23-25
Client: erl
Patch: Sergey Yelin
This closes #2677
Summary of changes:
- Add useful compiler options
- Format sources using erlfmt
- Switch to modern callbacks in thrift_* modules
- Add static analysis (dialyzer), disabled by default
- Add/fix types for API calls
NOTE: Enabling static analysis requires additional tweaks in multiplexer module.
diff --git a/lib/erl/src/thrift_reconnecting_client.erl b/lib/erl/src/thrift_reconnecting_client.erl
index 538fd3a..c9fe2aa 100644
--- a/lib/erl/src/thrift_reconnecting_client.erl
+++ b/lib/erl/src/thrift_reconnecting_client.erl
@@ -22,30 +22,36 @@
-behaviour(gen_server).
%% API
--export([ call/3,
- get_stats/1,
- get_and_reset_stats/1 ]).
+-export([
+ call/3,
+ get_stats/1,
+ get_and_reset_stats/1
+]).
--export([ start_link/6 ]).
+-export([start_link/6]).
%% gen_server callbacks
--export([ init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- terminate/2,
- code_change/3 ]).
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+]).
--record( state, { client = nil,
- host,
- port,
- thrift_svc,
- thrift_opts,
- reconn_min,
- reconn_max,
- reconn_time = 0,
- op_cnt_dict,
- op_time_dict } ).
+-record(state, {
+ client = nil,
+ host,
+ port,
+ thrift_svc,
+ thrift_opts,
+ reconn_min,
+ reconn_max,
+ reconn_time = 0,
+ op_cnt_dict,
+ op_time_dict
+}).
%%====================================================================
%% API
@@ -54,23 +60,35 @@
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
-start_link( Host, Port,
- ThriftSvc, ThriftOpts,
- ReconnMin, ReconnMax ) ->
- gen_server:start_link( ?MODULE,
- [ Host, Port,
- ThriftSvc, ThriftOpts,
- ReconnMin, ReconnMax ],
- [] ).
+start_link(
+ Host,
+ Port,
+ ThriftSvc,
+ ThriftOpts,
+ ReconnMin,
+ ReconnMax
+) ->
+ gen_server:start_link(
+ ?MODULE,
+ [
+ Host,
+ Port,
+ ThriftSvc,
+ ThriftOpts,
+ ReconnMin,
+ ReconnMax
+ ],
+ []
+ ).
-call( Pid, Op, Args ) ->
- gen_server:call( Pid, { call, Op, Args } ).
+call(Pid, Op, Args) ->
+ gen_server:call(Pid, {call, Op, Args}).
-get_stats( Pid ) ->
- gen_server:call( Pid, get_stats ).
+get_stats(Pid) ->
+ gen_server:call(Pid, get_stats).
-get_and_reset_stats( Pid ) ->
- gen_server:call( Pid, get_and_reset_stats ).
+get_and_reset_stats(Pid) ->
+ gen_server:call(Pid, get_and_reset_stats).
%%====================================================================
%% gen_server callbacks
@@ -83,19 +101,21 @@
%% {stop, Reason}
%% Description: Start the server.
%%--------------------------------------------------------------------
-init( [ Host, Port, TSvc, TOpts, ReconnMin, ReconnMax ] ) ->
- process_flag( trap_exit, true ),
+init([Host, Port, TSvc, TOpts, ReconnMin, ReconnMax]) ->
+ process_flag(trap_exit, true),
- State = #state{ host = Host,
- port = Port,
- thrift_svc = TSvc,
- thrift_opts = TOpts,
- reconn_min = ReconnMin,
- reconn_max = ReconnMax,
- op_cnt_dict = dict:new(),
- op_time_dict = dict:new() },
+ State = #state{
+ host = Host,
+ port = Port,
+ thrift_svc = TSvc,
+ thrift_opts = TOpts,
+ reconn_min = ReconnMin,
+ reconn_max = ReconnMax,
+ op_cnt_dict = dict:new(),
+ op_time_dict = dict:new()
+ },
- { ok, try_connect( State ) }.
+ {ok, try_connect(State)}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -106,40 +126,44 @@
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
-handle_call( { call, Op, _ },
- _From,
- State = #state{ client = nil } ) ->
- { reply, { error, noconn }, incr_stats( Op, "failfast", 1, State ) };
+handle_call(
+ {call, Op, _},
+ _From,
+ State = #state{client = nil}
+) ->
+ {reply, {error, noconn}, incr_stats(Op, "failfast", 1, State)};
+handle_call(
+ {call, Op, Args},
+ _From,
+ State = #state{client = Client}
+) ->
+ Timer = timer_fun(),
+ Result = (catch thrift_client:call(Client, Op, Args)),
+ Time = Timer(),
-handle_call( { call, Op, Args },
- _From,
- State=#state{ client = Client } ) ->
-
- Timer = timer_fun(),
- Result = ( catch thrift_client:call( Client, Op, Args) ),
- Time = Timer(),
-
- case Result of
- { C, { ok, Reply } } ->
- S = incr_stats( Op, "success", Time, State#state{ client = C } ),
- { reply, {ok, Reply }, S };
- { _, { E, Msg } } when E == error; E == exception ->
- S = incr_stats( Op, "error", Time, try_connect( State ) ),
- { reply, { E, Msg }, S };
- Other ->
- S = incr_stats( Op, "error", Time, try_connect( State ) ),
- { reply, Other, S }
- end;
-
-handle_call( get_stats,
- _From,
- State = #state{} ) ->
- { reply, stats( State ), State };
-
-handle_call( get_and_reset_stats,
- _From,
- State = #state{} ) ->
- { reply, stats( State ), reset_stats( State ) }.
+ case Result of
+ {C, {ok, Reply}} ->
+ S = incr_stats(Op, "success", Time, State#state{client = C}),
+ {reply, {ok, Reply}, S};
+ {_, {E, Msg}} when E == error; E == exception ->
+ S = incr_stats(Op, "error", Time, try_connect(State)),
+ {reply, {E, Msg}, S};
+ Other ->
+ S = incr_stats(Op, "error", Time, try_connect(State)),
+ {reply, Other, S}
+ end;
+handle_call(
+ get_stats,
+ _From,
+ State = #state{}
+) ->
+ {reply, stats(State), State};
+handle_call(
+ get_and_reset_stats,
+ _From,
+ State = #state{}
+) ->
+ {reply, stats(State), reset_stats(State)}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
@@ -147,8 +171,8 @@
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
-handle_cast( _Msg, State ) ->
- { noreply, State }.
+handle_cast(_Msg, State) ->
+ {noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
@@ -156,11 +180,10 @@
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
-handle_info( try_connect, State ) ->
- { noreply, try_connect( State ) };
-
-handle_info( _Info, State ) ->
- { noreply, State }.
+handle_info(try_connect, State) ->
+ {noreply, try_connect(State)};
+handle_info(_Info, State) ->
+ {noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
@@ -169,90 +192,103 @@
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
-terminate( _Reason, #state{ client = Client } ) ->
- thrift_client:close( Client ),
- ok.
+terminate(_Reason, #state{client = Client}) ->
+ thrift_client:close(Client),
+ ok.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
-code_change( _OldVsn, State, _Extra ) ->
- { ok, State }.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
-try_connect( State = #state{ client = OldClient,
- host = Host,
- port = Port,
- thrift_svc = TSvc,
- thrift_opts = TOpts } ) ->
+try_connect(
+ State = #state{
+ client = OldClient,
+ host = Host,
+ port = Port,
+ thrift_svc = TSvc,
+ thrift_opts = TOpts
+ }
+) ->
+ case OldClient of
+ nil -> ok;
+ _ -> (catch thrift_client:close(OldClient))
+ end,
- case OldClient of
- nil -> ok;
- _ -> ( catch thrift_client:close( OldClient ) )
- end,
+ case catch thrift_client_util:new(Host, Port, TSvc, TOpts) of
+ {ok, Client} ->
+ State#state{client = Client, reconn_time = 0};
+ {E, Msg} when E == error; E == exception ->
+ ReconnTime = reconn_time(State),
+ error_logger:error_msg(
+ "[~w] ~w connect failed (~w), trying again in ~w ms~n",
+ [self(), TSvc, Msg, ReconnTime]
+ ),
+ erlang:send_after(ReconnTime, self(), try_connect),
+ State#state{client = nil, reconn_time = ReconnTime}
+ end.
- case catch thrift_client_util:new( Host, Port, TSvc, TOpts ) of
- { ok, Client } ->
- State#state{ client = Client, reconn_time = 0 };
- { E, Msg } when E == error; E == exception ->
- ReconnTime = reconn_time( State ),
- error_logger:error_msg( "[~w] ~w connect failed (~w), trying again in ~w ms~n",
- [ self(), TSvc, Msg, ReconnTime ] ),
- erlang:send_after( ReconnTime, self(), try_connect ),
- State#state{ client = nil, reconn_time = ReconnTime }
- end.
-
-
-reconn_time( #state{ reconn_min = ReconnMin, reconn_time = 0 } ) ->
- ReconnMin;
-reconn_time( #state{ reconn_max = ReconnMax, reconn_time = ReconnMax } ) ->
- ReconnMax;
-reconn_time( #state{ reconn_max = ReconnMax, reconn_time = R } ) ->
- Backoff = 2 * R,
- case Backoff > ReconnMax of
- true -> ReconnMax;
- false -> Backoff
- end.
+reconn_time(#state{reconn_min = ReconnMin, reconn_time = 0}) ->
+ ReconnMin;
+reconn_time(#state{reconn_max = ReconnMax, reconn_time = ReconnMax}) ->
+ ReconnMax;
+reconn_time(#state{reconn_max = ReconnMax, reconn_time = R}) ->
+ Backoff = 2 * R,
+ case Backoff > ReconnMax of
+ true -> ReconnMax;
+ false -> Backoff
+ end.
-ifdef(time_correction).
timer_fun() ->
- T1 = erlang:monotonic_time(),
- fun() ->
- T2 = erlang:monotonic_time(),
- erlang:convert_time_unit(T2 - T1, native, micro_seconds)
- end.
+ T1 = erlang:monotonic_time(),
+ fun() ->
+ T2 = erlang:monotonic_time(),
+ erlang:convert_time_unit(T2 - T1, native, micro_seconds)
+ end.
-else.
timer_fun() ->
- T1 = erlang:timestamp(),
- fun() ->
- T2 = erlang:timestamp(),
- timer:now_diff(T2, T1)
- end.
+ T1 = erlang:timestamp(),
+ fun() ->
+ T2 = erlang:timestamp(),
+ timer:now_diff(T2, T1)
+ end.
-endif.
-incr_stats( Op, Result, Time,
- State = #state{ op_cnt_dict = OpCntDict,
- op_time_dict = OpTimeDict } ) ->
- Key = lists:flatten( [ atom_to_list( Op ), [ "_" | Result ] ] ),
- State#state{ op_cnt_dict = dict:update_counter( Key, 1, OpCntDict ),
- op_time_dict = dict:update_counter( Key, Time, OpTimeDict ) }.
+incr_stats(
+ Op,
+ Result,
+ Time,
+ State = #state{
+ op_cnt_dict = OpCntDict,
+ op_time_dict = OpTimeDict
+ }
+) ->
+ Key = lists:flatten([atom_to_list(Op), ["_" | Result]]),
+ State#state{
+ op_cnt_dict = dict:update_counter(Key, 1, OpCntDict),
+ op_time_dict = dict:update_counter(Key, Time, OpTimeDict)
+ }.
+stats(#state{
+ thrift_svc = TSvc,
+ op_cnt_dict = OpCntDict,
+ op_time_dict = OpTimeDict
+}) ->
+ Svc = atom_to_list(TSvc),
-stats( #state{ thrift_svc = TSvc,
- op_cnt_dict = OpCntDict,
- op_time_dict = OpTimeDict } ) ->
- Svc = atom_to_list(TSvc),
+ F = fun(Key, Count, Stats) ->
+ Name = lists:flatten([Svc, ["_" | Key]]),
+ Micros = dict:fetch(Key, OpTimeDict),
+ [{Name, Count, Micros} | Stats]
+ end,
- F = fun( Key, Count, Stats ) ->
- Name = lists:flatten( [ Svc, [ "_" | Key ] ] ),
- Micros = dict:fetch( Key, OpTimeDict ),
- [ { Name, Count, Micros } | Stats ]
- end,
+ dict:fold(F, [], OpCntDict).
- dict:fold( F, [], OpCntDict ).
-
-reset_stats( State = #state{} ) ->
- State#state{ op_cnt_dict = dict:new(), op_time_dict = dict:new() }.
+reset_stats(State = #state{}) ->
+ State#state{op_cnt_dict = dict:new(), op_time_dict = dict:new()}.