Change thrift_disk_log_transport to not flush when flush/1 is called if sync_every is defined
Summary:
For fast logging we don't want to actually flush to disk after every message.
There's force_flush/1 now if you actually want to force one
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666467 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/alterl/src/thrift_disk_log_transport.erl b/lib/alterl/src/thrift_disk_log_transport.erl
index 71d37a5..6ae7ded 100644
--- a/lib/alterl/src/thrift_disk_log_transport.erl
+++ b/lib/alterl/src/thrift_disk_log_transport.erl
@@ -3,16 +3,20 @@
%%% Author : Todd Lipcon <todd@amiestreet.com>
%%% Description : Write-only Thrift transport outputting to disk_log
%%% Created : 22 Apr 2008 by Todd Lipcon <todd@lipcon.org>
+%%%
+%%% Todo: this might be better off as a gen_server type of transport
+%%% that handles stuff like group commit, similar to TFileTransport
+%%% in cpp land
%%%-------------------------------------------------------------------
-module(thrift_disk_log_transport).
-behaviour(thrift_transport).
%% API
--export([new/2, new_transport_factory/2]).
+-export([new/2, new_transport_factory/2, new_transport_factory/3]).
%% thrift_transport callbacks
--export([read/2, write/2, flush/1, close/1]).
+-export([read/2, write/2, force_flush/1, flush/1, close/1]).
%% state
-record(dl_transport, {log,
@@ -26,17 +30,17 @@
%% when the transport is closed, pass a {close_on_close, true} tuple in the
%% Opts list.
new(LogName, Opts) when is_atom(LogName), is_list(Opts) ->
- State = #dl_transport{log = LogName},
+ State = parse_opts(Opts, #dl_transport{log = LogName}),
State2 =
case State#dl_transport.sync_every of
N when is_integer(N), N > 0 ->
- {ok, TRef} = timer:apply_interval(N, ?MODULE, flush, State),
+ {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, State),
State#dl_transport{sync_tref = TRef};
_ -> State
end,
- thrift_transport:new(?MODULE, parse_opts(Opts, State2)).
+ thrift_transport:new(?MODULE, State2).
parse_opts([], State) ->
@@ -56,9 +60,19 @@
write(#dl_transport{log = Log}, Data) ->
disk_log:balog(Log, erlang:iolist_to_binary(Data)).
-flush(#dl_transport{log = Log}) ->
+force_flush(#dl_transport{log = Log}) ->
+ error_logger:info_msg("~p syncing~n", [?MODULE]),
disk_log:sync(Log).
+flush(#dl_transport{log = Log, sync_every = SE}) ->
+ case SE of
+ undefined -> % no time-based sync
+ disk_log:sync(Log);
+ _Else -> % sync will happen automagically
+ ok
+ end.
+
+
%% On close, close the underlying log if we're configured to do so.
close(#dl_transport{close_on_close = false}) ->
ok;
@@ -69,10 +83,14 @@
%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
new_transport_factory(Name, ExtraLogOpts) ->
- F = fun() -> factory_impl(Name, ExtraLogOpts) end,
+ new_transport_factory(Name, ExtraLogOpts, [{close_on_close, true},
+ {sync_every, 500}]).
+
+new_transport_factory(Name, ExtraLogOpts, TransportOpts) ->
+ F = fun() -> factory_impl(Name, ExtraLogOpts, TransportOpts) end,
{ok, F}.
-factory_impl(Name, ExtraLogOpts) ->
+factory_impl(Name, ExtraLogOpts, TransportOpts) ->
LogOpts = [{name, Name},
{format, external},
{type, wrap} |
@@ -85,4 +103,4 @@
error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [Log, Info1, Info2]),
Log
end,
- new(Log, [{close_on_close, true}]).
+ new(Log, TransportOpts).