Add thrift_disk_log_transport which writes to the disk_log module

Summary:
  See test/erl/src/test_disklog.erl for example usage

Test plan: test_disklog:t(), then hexdump -C /tmp/test_log.1


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666464 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/alterl/src/thrift_disk_log_transport.erl b/lib/alterl/src/thrift_disk_log_transport.erl
new file mode 100644
index 0000000..71d37a5
--- /dev/null
+++ b/lib/alterl/src/thrift_disk_log_transport.erl
@@ -0,0 +1,88 @@
+%%%-------------------------------------------------------------------
+%%% File    : thrift_disk_log_transport.erl
+%%% Author  : Todd Lipcon <todd@amiestreet.com>
+%%% Description : Write-only Thrift transport outputting to disk_log
+%%% Created : 22 Apr 2008 by Todd Lipcon <todd@lipcon.org>
+%%%-------------------------------------------------------------------
+-module(thrift_disk_log_transport).
+
+-behaviour(thrift_transport).
+
+%% API
+-export([new/2, new_transport_factory/2]).
+
+%% thrift_transport callbacks
+-export([read/2, write/2, flush/1, close/1]).
+
+%% state
+-record(dl_transport, {log,
+                       close_on_close = false,
+                       sync_every = infinity,
+                       sync_tref}).
+
+
+%% Create a transport attached to an already open log.
+%% If you'd like this transport to close the disk_log using disk_log:lclose()
+%% when the transport is closed, pass a {close_on_close, true} tuple in the
+%% Opts list.
+new(LogName, Opts) when is_atom(LogName), is_list(Opts) ->
+    State = #dl_transport{log = LogName},
+
+    State2 =
+        case State#dl_transport.sync_every of
+            N when is_integer(N), N > 0 ->
+                {ok, TRef} = timer:apply_interval(N, ?MODULE, flush, State),
+                State#dl_transport{sync_tref = TRef};
+            _ -> State
+        end,
+
+    thrift_transport:new(?MODULE, parse_opts(Opts, State2)).
+
+
+parse_opts([], State) ->
+    State;
+parse_opts([{close_on_close, Bool} | Rest], State) when is_boolean(Bool) ->
+    State#dl_transport{close_on_close = Bool};
+parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 ->
+    State#dl_transport{sync_every = Int}.
+
+
+%%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%% disk_log_transport is write-only
+read(_State, Len) ->
+    {error, no_read_from_disk_log}.
+
+write(#dl_transport{log = Log}, Data) ->
+    disk_log:balog(Log, erlang:iolist_to_binary(Data)).
+
+flush(#dl_transport{log = Log}) ->
+    disk_log:sync(Log).
+
+%% On close, close the underlying log if we're configured to do so.
+close(#dl_transport{close_on_close = false}) ->
+    ok;
+close(#dl_transport{log = Log}) ->
+    disk_log:lclose(Log).
+
+
+%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+new_transport_factory(Name, ExtraLogOpts) ->
+    F = fun() -> factory_impl(Name, ExtraLogOpts) end,
+    {ok, F}.
+
+factory_impl(Name, ExtraLogOpts) ->
+    LogOpts = [{name, Name},
+               {format, external},
+               {type, wrap} |
+               ExtraLogOpts],
+    Log =
+        case disk_log:open(LogOpts) of
+            {ok, Log} ->
+                Log;
+            {repaired, Log, Info1, Info2} ->
+                error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [Log, Info1, Info2]),
+                Log
+        end,
+    new(Log, [{close_on_close, true}]).
diff --git a/test/erl/Makefile b/test/erl/Makefile
index 42572d2..890c809 100644
--- a/test/erl/Makefile
+++ b/test/erl/Makefile
@@ -10,7 +10,7 @@
 ALL_INCLUDEDIR=$(GEN_INCLUDEDIR) $(INCLUDEDIR) ../../lib/alterl/include
 INCLUDEFLAGS=$(patsubst %,-I%, ${ALL_INCLUDEDIR})
 
-MODULES = stress_server test_server
+MODULES = stress_server test_server test_disklog
 
 INCLUDES = 
 TARGETS = $(patsubst %,${TARGETDIR}/%.beam,${MODULES})
diff --git a/test/erl/src/test_disklog.erl b/test/erl/src/test_disklog.erl
new file mode 100644
index 0000000..0044b83
--- /dev/null
+++ b/test/erl/src/test_disklog.erl
@@ -0,0 +1,25 @@
+-module(test_disklog).
+
+-compile(export_all).
+
+t() ->
+    {ok, TransportFactory} =
+        thrift_disk_log_transport:new_transport_factory(
+          test_disklog,
+          [{file, "/tmp/test_log"},
+           {size, {1024*1024, 10}}]),
+    {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory(
+                              TransportFactory, []),
+    {ok, Client} = thrift_client:start_link(ProtocolFactory, thriftTest_thrift),
+
+    io:format("Client started~n"),
+    % We have to make async calls into this client only since otherwise it will try
+    % to read from the disklog and go boom.
+    {ok, ok} = thrift_client:call(Client, testAsync, [16#deadbeef]),
+    io:format("Call written~n"),
+
+    ok = thrift_client:close(Client),
+    io:format("Client closed~n"),
+
+    ok.
+