Thrift now a TLP - INFRA-3116
git-svn-id: https://svn.apache.org/repos/asf/thrift/branches/0.1.x@1028168 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/erl/src/Makefile b/lib/erl/src/Makefile
new file mode 100644
index 0000000..980af81
--- /dev/null
+++ b/lib/erl/src/Makefile
@@ -0,0 +1,116 @@
+# $Id: Makefile,v 1.3 2004/08/13 16:35:59 mlogan Exp $
+#
+include ../build/otp.mk
+include ../build/colors.mk
+include ../build/buildtargets.mk
+
+# ----------------------------------------------------
+# Application version
+# ----------------------------------------------------
+
+include ../vsn.mk
+APP_NAME=thrift
+PFX=thrift
+VSN=$(THRIFT_VSN)
+
+# ----------------------------------------------------
+# Install directory specification
+# WARNING: INSTALL_DIR the command to install a directory.
+# INSTALL_DST is the target directory
+# ----------------------------------------------------
+INSTALL_DST = $(ERLANG_OTP)/lib/$(APP_NAME)-$(VSN)
+
+# ----------------------------------------------------
+# Target Specs
+# ----------------------------------------------------
+
+
+MODULES = $(shell find . -name \*.erl | sed 's:^\./::' | sed 's/\.erl//')
+MODULES_STRING_LIST = $(shell find . -name \*.erl | sed 's:^\./:":' | sed 's/\.erl/",/')
+
+HRL_FILES=
+INTERNAL_HRL_FILES= $(APP_NAME).hrl
+ERL_FILES= $(MODULES:%=%.erl)
+DOC_FILES=$(ERL_FILES)
+
+APP_FILE= $(APP_NAME).app
+APPUP_FILE= $(APP_NAME).appup
+
+APP_SRC= $(APP_FILE).src
+APPUP_SRC= $(APPUP_FILE).src
+
+APP_TARGET= $(EBIN)/$(APP_FILE)
+APPUP_TARGET= $(EBIN)/$(APPUP_FILE)
+
+BEAMS= $(MODULES:%=$(EBIN)/%.$(EMULATOR))
+TARGET_FILES= $(BEAMS) $(APP_TARGET) $(APPUP_TARGET)
+
+WEB_TARGET=/var/yaws/www/$(APP_NAME)
+
+# ----------------------------------------------------
+# FLAGS
+# ----------------------------------------------------
+
+ERL_FLAGS +=
+ERL_INCLUDE = -I../include -I../../fslib/include -I../../system_status/include
+ERL_COMPILE_FLAGS += $(ERL_INCLUDE)
+
+# ----------------------------------------------------
+# Targets
+# ----------------------------------------------------
+
+all debug opt: $(EBIN) $(TARGET_FILES)
+
+#$(EBIN)/rm_logger.beam: $(APP_NAME).hrl
+include ../build/docs.mk
+
+# Note: In the open-source build clean must not destroy the preloaded
+# beam files.
+clean:
+ rm -f $(TARGET_FILES)
+ rm -f *~
+ rm -f core
+ rm -rf $(EBIN)
+ rm -rf *html
+
+$(EBIN):
+ mkdir $(EBIN)
+
+dialyzer: $(TARGET_FILES)
+ dialyzer --src -r . $(ERL_INCLUDE)
+
+# ----------------------------------------------------
+# Special Build Targets
+# ----------------------------------------------------
+
+$(APP_TARGET): $(APP_SRC) ../vsn.mk $(BEAMS)
+ sed -e 's;%VSN%;$(VSN);' \
+ -e 's;%PFX%;$(PFX);' \
+ -e 's;%APP_NAME%;$(APP_NAME);' \
+ -e 's;%MODULES%;%MODULES%$(MODULES_STRING_LIST);' \
+ $< > $<".tmp"
+ sed -e 's/%MODULES%\(.*\),/\1/' \
+ $<".tmp" > $@
+ rm $<".tmp"
+
+$(APPUP_TARGET): $(APPUP_SRC) ../vsn.mk
+ sed -e 's;%VSN%;$(VSN);' $< > $@
+
+$(WEB_TARGET): ../markup/*
+ rm -rf $(WEB_TARGET)
+ mkdir $(WEB_TARGET)
+ cp -r ../markup/ $(WEB_TARGET)
+ cp -r ../skins/ $(WEB_TARGET)
+
+# ----------------------------------------------------
+# Install Target
+# ----------------------------------------------------
+
+install: all $(WEB_TARGET)
+# $(INSTALL_DIR) $(INSTALL_DST)/src
+# $(INSTALL_DATA) $(ERL_FILES) $(INSTALL_DST)/src
+# $(INSTALL_DATA) $(INTERNAL_HRL_FILES) $(INSTALL_DST)/src
+# $(INSTALL_DIR) $(INSTALL_DST)/include
+# $(INSTALL_DATA) $(HRL_FILES) $(INSTALL_DST)/include
+# $(INSTALL_DIR) $(INSTALL_DST)/ebin
+# $(INSTALL_DATA) $(TARGET_FILES) $(INSTALL_DST)/ebin
diff --git a/lib/erl/src/test_handler.erl b/lib/erl/src/test_handler.erl
new file mode 100644
index 0000000..28a3acd
--- /dev/null
+++ b/lib/erl/src/test_handler.erl
@@ -0,0 +1,26 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(test_handler).
+
+-export([handle_function/2]).
+
+handle_function(add, Params = {A, B}) ->
+ io:format("Got params: ~p~n", [Params]),
+ {reply, A + B}.
diff --git a/lib/erl/src/test_service.erl b/lib/erl/src/test_service.erl
new file mode 100644
index 0000000..7aa4827
--- /dev/null
+++ b/lib/erl/src/test_service.erl
@@ -0,0 +1,29 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(test_service).
+%
+% Test service definition
+
+-export([function_info/2]).
+
+function_info(add, params_type) ->
+ {struct, [{1, i32},
+ {2, i32}]};
+function_info(add, reply_type) -> i32.
diff --git a/lib/erl/src/thrift.app.src b/lib/erl/src/thrift.app.src
new file mode 100644
index 0000000..681b3eb
--- /dev/null
+++ b/lib/erl/src/thrift.app.src
@@ -0,0 +1,44 @@
+%%% -*- mode:erlang -*-
+{application, %APP_NAME%,
+ [
+ % A quick description of the application.
+ {description, "Thrift bindings"},
+
+ % The version of the applicaton
+ {vsn, "%VSN%"},
+
+ % All modules used by the application.
+ {modules, [
+ %MODULES%
+ ]},
+
+ % All of the registered names the application uses. This can be ignored.
+ {registered, []},
+
+ % Applications that are to be started prior to this one. This can be ignored
+ % leave it alone unless you understand it well and let the .rel files in
+ % your release handle this.
+ {applications,
+ [
+ kernel,
+ stdlib
+ ]},
+
+ % OTP application loader will load, but not start, included apps. Again
+ % this can be ignored as well. To load but not start an application it
+ % is easier to include it in the .rel file followed by the atom 'none'
+ {included_applications, []},
+
+ % configuration parameters similar to those in the config file specified
+ % on the command line. can be fetched with gas:get_env
+ {env, [
+ % If an error/crash occurs during processing of a function,
+ % should the TApplicationException serialized back to the client
+ % include the erlang backtrace?
+ {exceptions_include_traces, true}
+ ]},
+
+ % The Module and Args used to start this application.
+ {mod, {thrift_app, []}}
+ ]
+}.
diff --git a/lib/erl/src/thrift.appup.src b/lib/erl/src/thrift.appup.src
new file mode 100644
index 0000000..54a6383
--- /dev/null
+++ b/lib/erl/src/thrift.appup.src
@@ -0,0 +1 @@
+{"%VSN%",[],[]}.
diff --git a/lib/erl/src/thrift_base64_transport.erl b/lib/erl/src/thrift_base64_transport.erl
new file mode 100644
index 0000000..9d13151
--- /dev/null
+++ b/lib/erl/src/thrift_base64_transport.erl
@@ -0,0 +1,64 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_base64_transport).
+
+-behaviour(thrift_transport).
+
+%% API
+-export([new/1, new_transport_factory/1]).
+
+%% thrift_transport callbacks
+-export([write/2, read/2, flush/1, close/1]).
+
+%% State
+-record(b64_transport, {wrapped}).
+
+new(Wrapped) ->
+ State = #b64_transport{wrapped = Wrapped},
+ thrift_transport:new(?MODULE, State).
+
+
+write(#b64_transport{wrapped = Wrapped}, Data) ->
+ thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))).
+
+
+%% base64 doesn't support reading quite yet since it would involve
+%% nasty buffering and such
+read(#b64_transport{wrapped = Wrapped}, Data) ->
+ {error, no_reads_allowed}.
+
+
+flush(#b64_transport{wrapped = Wrapped}) ->
+ thrift_transport:write(Wrapped, <<"\n">>),
+ thrift_transport:flush(Wrapped).
+
+
+close(Me = #b64_transport{wrapped = Wrapped}) ->
+ flush(Me),
+ thrift_transport:close(Wrapped).
+
+
+%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+new_transport_factory(WrapFactory) ->
+ F = fun() ->
+ {ok, Wrapped} = WrapFactory(),
+ new(Wrapped)
+ end,
+ {ok, F}.
diff --git a/lib/erl/src/thrift_binary_protocol.erl b/lib/erl/src/thrift_binary_protocol.erl
new file mode 100644
index 0000000..ad53384
--- /dev/null
+++ b/lib/erl/src/thrift_binary_protocol.erl
@@ -0,0 +1,325 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_binary_protocol).
+
+-behavior(thrift_protocol).
+
+-include("thrift_constants.hrl").
+-include("thrift_protocol.hrl").
+
+-export([new/1, new/2,
+ read/2,
+ write/2,
+ flush_transport/1,
+ close_transport/1,
+
+ new_protocol_factory/2
+ ]).
+
+-record(binary_protocol, {transport,
+ strict_read=true,
+ strict_write=true
+ }).
+
+-define(VERSION_MASK, 16#FFFF0000).
+-define(VERSION_1, 16#80010000).
+-define(TYPE_MASK, 16#000000ff).
+
+new(Transport) ->
+ new(Transport, _Options = []).
+
+new(Transport, Options) ->
+ State = #binary_protocol{transport = Transport},
+ State1 = parse_options(Options, State),
+ thrift_protocol:new(?MODULE, State1).
+
+parse_options([], State) ->
+ State;
+parse_options([{strict_read, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_options(Rest, State#binary_protocol{strict_read=Bool});
+parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_options(Rest, State#binary_protocol{strict_write=Bool}).
+
+
+flush_transport(#binary_protocol{transport = Transport}) ->
+ thrift_transport:flush(Transport).
+
+close_transport(#binary_protocol{transport = Transport}) ->
+ thrift_transport:close(Transport).
+
+%%%
+%%% instance methods
+%%%
+
+write(This, #protocol_message_begin{
+ name = Name,
+ type = Type,
+ seqid = Seqid}) ->
+ case This#binary_protocol.strict_write of
+ true ->
+ write(This, {i32, ?VERSION_1 bor Type}),
+ write(This, {string, Name}),
+ write(This, {i32, Seqid});
+ false ->
+ write(This, {string, Name}),
+ write(This, {byte, Type}),
+ write(This, {i32, Seqid})
+ end,
+ ok;
+
+write(This, message_end) -> ok;
+
+write(This, #protocol_field_begin{
+ name = _Name,
+ type = Type,
+ id = Id}) ->
+ write(This, {byte, Type}),
+ write(This, {i16, Id}),
+ ok;
+
+write(This, field_stop) ->
+ write(This, {byte, ?tType_STOP}),
+ ok;
+
+write(This, field_end) -> ok;
+
+write(This, #protocol_map_begin{
+ ktype = Ktype,
+ vtype = Vtype,
+ size = Size}) ->
+ write(This, {byte, Ktype}),
+ write(This, {byte, Vtype}),
+ write(This, {i32, Size}),
+ ok;
+
+write(This, map_end) -> ok;
+
+write(This, #protocol_list_begin{
+ etype = Etype,
+ size = Size}) ->
+ write(This, {byte, Etype}),
+ write(This, {i32, Size}),
+ ok;
+
+write(This, list_end) -> ok;
+
+write(This, #protocol_set_begin{
+ etype = Etype,
+ size = Size}) ->
+ write(This, {byte, Etype}),
+ write(This, {i32, Size}),
+ ok;
+
+write(This, set_end) -> ok;
+
+write(This, #protocol_struct_begin{}) -> ok;
+write(This, struct_end) -> ok;
+
+write(This, {bool, true}) -> write(This, {byte, 1});
+write(This, {bool, false}) -> write(This, {byte, 0});
+
+write(This, {byte, Byte}) ->
+ write(This, <<Byte:8/big-signed>>);
+
+write(This, {i16, I16}) ->
+ write(This, <<I16:16/big-signed>>);
+
+write(This, {i32, I32}) ->
+ write(This, <<I32:32/big-signed>>);
+
+write(This, {i64, I64}) ->
+ write(This, <<I64:64/big-signed>>);
+
+write(This, {double, Double}) ->
+ write(This, <<Double:64/big-signed-float>>);
+
+write(This, {string, Str}) when is_list(Str) ->
+ write(This, {i32, length(Str)}),
+ write(This, list_to_binary(Str));
+
+write(This, {string, Bin}) when is_binary(Bin) ->
+ write(This, {i32, size(Bin)}),
+ write(This, Bin);
+
+%% Data :: iolist()
+write(This, Data) ->
+ thrift_transport:write(This#binary_protocol.transport, Data).
+
+%%
+
+read(This, message_begin) ->
+ case read(This, ui32) of
+ {ok, Sz} when Sz band ?VERSION_MASK =:= ?VERSION_1 ->
+ %% we're at version 1
+ {ok, Name} = read(This, string),
+ Type = Sz band ?TYPE_MASK,
+ {ok, SeqId} = read(This, i32),
+ #protocol_message_begin{name = binary_to_list(Name),
+ type = Type,
+ seqid = SeqId};
+
+ {ok, Sz} when Sz < 0 ->
+ %% there's a version number but it's unexpected
+ {error, {bad_binary_protocol_version, Sz}};
+
+ {ok, Sz} when This#binary_protocol.strict_read =:= true ->
+ %% strict_read is true and there's no version header; that's an error
+ {error, no_binary_protocol_version};
+
+ {ok, Sz} when This#binary_protocol.strict_read =:= false ->
+ %% strict_read is false, so just read the old way
+ {ok, Name} = read(This, Sz),
+ {ok, Type} = read(This, byte),
+ {ok, SeqId} = read(This, i32),
+ #protocol_message_begin{name = binary_to_list(Name),
+ type = Type,
+ seqid = SeqId};
+
+ Err = {error, closed} -> Err;
+ Err = {error, timeout}-> Err;
+ Err = {error, ebadf} -> Err
+ end;
+
+read(This, message_end) -> ok;
+
+read(This, struct_begin) -> ok;
+read(This, struct_end) -> ok;
+
+read(This, field_begin) ->
+ case read(This, byte) of
+ {ok, Type = ?tType_STOP} ->
+ #protocol_field_begin{type = Type};
+ {ok, Type} ->
+ {ok, Id} = read(This, i16),
+ #protocol_field_begin{type = Type,
+ id = Id}
+ end;
+
+read(This, field_end) -> ok;
+
+read(This, map_begin) ->
+ {ok, Ktype} = read(This, byte),
+ {ok, Vtype} = read(This, byte),
+ {ok, Size} = read(This, i32),
+ #protocol_map_begin{ktype = Ktype,
+ vtype = Vtype,
+ size = Size};
+read(This, map_end) -> ok;
+
+read(This, list_begin) ->
+ {ok, Etype} = read(This, byte),
+ {ok, Size} = read(This, i32),
+ #protocol_list_begin{etype = Etype,
+ size = Size};
+read(This, list_end) -> ok;
+
+read(This, set_begin) ->
+ {ok, Etype} = read(This, byte),
+ {ok, Size} = read(This, i32),
+ #protocol_set_begin{etype = Etype,
+ size = Size};
+read(This, set_end) -> ok;
+
+read(This, field_stop) ->
+ {ok, ?tType_STOP} = read(This, byte),
+ ok;
+
+%%
+
+read(This, bool) ->
+ case read(This, byte) of
+ {ok, Byte} -> {ok, Byte /= 0};
+ Else -> Else
+ end;
+
+read(This, byte) ->
+ case read(This, 1) of
+ {ok, <<Val:8/integer-signed-big, _/binary>>} -> {ok, Val};
+ Else -> Else
+ end;
+
+read(This, i16) ->
+ case read(This, 2) of
+ {ok, <<Val:16/integer-signed-big, _/binary>>} -> {ok, Val};
+ Else -> Else
+ end;
+
+read(This, i32) ->
+ case read(This, 4) of
+ {ok, <<Val:32/integer-signed-big, _/binary>>} -> {ok, Val};
+ Else -> Else
+ end;
+
+%% unsigned ints aren't used by thrift itself, but it's used for the parsing
+%% of the packet version header. Without this special function BEAM works fine
+%% but hipe thinks it received a bad version header.
+read(This, ui32) ->
+ case read(This, 4) of
+ {ok, <<Val:32/integer-unsigned-big, _/binary>>} -> {ok, Val};
+ Else -> Else
+ end;
+
+read(This, i64) ->
+ case read(This, 8) of
+ {ok, <<Val:64/integer-signed-big, _/binary>>} -> {ok, Val};
+ Else -> Else
+ end;
+
+read(This, double) ->
+ case read(This, 8) of
+ {ok, <<Val:64/float-signed-big, _/binary>>} -> {ok, Val};
+ Else -> Else
+ end;
+
+% returns a binary directly, call binary_to_list if necessary
+read(This, string) ->
+ {ok, Sz} = read(This, i32),
+ {ok, Bin} = read(This, Sz);
+
+read(This, 0) -> {ok, <<>>};
+read(This, Len) when is_integer(Len), Len >= 0 ->
+ thrift_transport:read(This#binary_protocol.transport, Len).
+
+
+%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-record(tbp_opts, {strict_read = true,
+ strict_write = true}).
+
+parse_factory_options([], Opts) ->
+ Opts;
+parse_factory_options([{strict_read, Bool} | Rest], Opts) when is_boolean(Bool) ->
+ parse_factory_options(Rest, Opts#tbp_opts{strict_read=Bool});
+parse_factory_options([{strict_write, Bool} | Rest], Opts) when is_boolean(Bool) ->
+ parse_factory_options(Rest, Opts#tbp_opts{strict_write=Bool}).
+
+
+%% returns a (fun() -> thrift_protocol())
+new_protocol_factory(TransportFactory, Options) ->
+ ParsedOpts = parse_factory_options(Options, #tbp_opts{}),
+ F = fun() ->
+ {ok, Transport} = TransportFactory(),
+ thrift_binary_protocol:new(
+ Transport,
+ [{strict_read, ParsedOpts#tbp_opts.strict_read},
+ {strict_write, ParsedOpts#tbp_opts.strict_write}])
+ end,
+ {ok, F}.
+
diff --git a/lib/erl/src/thrift_buffered_transport.erl b/lib/erl/src/thrift_buffered_transport.erl
new file mode 100644
index 0000000..ebc16bd
--- /dev/null
+++ b/lib/erl/src/thrift_buffered_transport.erl
@@ -0,0 +1,180 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_buffered_transport).
+
+-behaviour(gen_server).
+-behaviour(thrift_transport).
+
+%% API
+-export([new/1, new_transport_factory/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% thrift_transport callbacks
+-export([write/2, read/2, flush/1, close/1]).
+
+-record(buffered_transport, {wrapped, % a thrift_transport
+ write_buffer % iolist()
+ }).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+new(WrappedTransport) ->
+ case gen_server:start_link(?MODULE, [WrappedTransport], []) of
+ {ok, Pid} ->
+ thrift_transport:new(?MODULE, Pid);
+ Else ->
+ Else
+ end.
+
+
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+ gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+ gen_server:call(Transport, flush).
+
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport and the wrapped transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+ gen_server:cast(Transport, close).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+ gen_server:call(Transport, {read, Len}, _Timeout=10000).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Wrapped]) ->
+ {ok, #buffered_transport{wrapped = Wrapped,
+ write_buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({write, Data}, _From, State = #buffered_transport{write_buffer = WBuf}) ->
+ {reply, ok, State#buffered_transport{write_buffer = [WBuf, Data]}};
+
+handle_call({read, Len}, _From, State = #buffered_transport{wrapped = Wrapped}) ->
+ Response = thrift_transport:read(Wrapped, Len),
+ {reply, Response, State};
+
+handle_call(flush, _From, State = #buffered_transport{write_buffer = WBuf,
+ wrapped = Wrapped}) ->
+ Response = thrift_transport:write(Wrapped, WBuf),
+ thrift_transport:flush(Wrapped),
+ {reply, Response, State#buffered_transport{write_buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(close, State = #buffered_transport{write_buffer = WBuf,
+ wrapped = Wrapped}) ->
+ thrift_transport:write(Wrapped, WBuf),
+ %% Wrapped is closed by terminate/2
+ %% error_logger:info_msg("thrift_buffered_transport ~p: closing", [self()]),
+ {stop, normal, State};
+handle_cast(Msg, State=#buffered_transport{}) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, State = #buffered_transport{wrapped=Wrapped}) ->
+ thrift_transport:close(Wrapped),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+new_transport_factory(WrapFactory) ->
+ F = fun() ->
+ {ok, Wrapped} = WrapFactory(),
+ new(Wrapped)
+ end,
+ {ok, F}.
diff --git a/lib/erl/src/thrift_client.erl b/lib/erl/src/thrift_client.erl
new file mode 100644
index 0000000..5ba8aee
--- /dev/null
+++ b/lib/erl/src/thrift_client.erl
@@ -0,0 +1,330 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_client).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/2, start_link/3, start_link/4, call/3, send_call/3, close/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+
+-include("thrift_constants.hrl").
+-include("thrift_protocol.hrl").
+
+-record(state, {service, protocol, seqid}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) ->
+ start_link(Host, Port, Service, []).
+
+
+%%
+%% Splits client options into protocol options and transport options
+%%
+%% split_options([Options...]) -> {ProtocolOptions, TransportOptions}
+%%
+split_options(Options) ->
+ split_options(Options, [], []).
+
+split_options([], ProtoIn, TransIn) ->
+ {ProtoIn, TransIn};
+
+split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn)
+ when OptKey =:= strict_read;
+ OptKey =:= strict_write ->
+ split_options(Rest, [Opt | ProtoIn], TransIn);
+
+split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn)
+ when OptKey =:= framed;
+ OptKey =:= connect_timeout;
+ OptKey =:= sockopts ->
+ split_options(Rest, ProtoIn, [Opt | TransIn]).
+
+
+%% Backwards-compatible starter for the common-case of socket transports
+start_link(Host, Port, Service, Options)
+ when is_integer(Port), is_atom(Service), is_list(Options) ->
+ {ProtoOpts, TransOpts} = split_options(Options),
+
+ {ok, TransportFactory} =
+ thrift_socket_transport:new_transport_factory(Host, Port, TransOpts),
+
+ {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory(
+ TransportFactory, ProtoOpts),
+
+ start_link(ProtocolFactory, Service).
+
+
+%% ProtocolFactory :: fun() -> thrift_protocol()
+start_link(ProtocolFactory, Service)
+ when is_function(ProtocolFactory), is_atom(Service) ->
+ case gen_server:start_link(?MODULE, [Service], []) of
+ {ok, Pid} ->
+ case gen_server:call(Pid, {connect, ProtocolFactory}) of
+ ok ->
+ {ok, Pid};
+ Error ->
+ Error
+ end;
+ Else ->
+ Else
+ end.
+
+call(Client, Function, Args)
+ when is_pid(Client), is_atom(Function), is_list(Args) ->
+ case gen_server:call(Client, {call, Function, Args}) of
+ R = {ok, _} -> R;
+ R = {error, _} -> R;
+ {exception, Exception} -> throw(Exception)
+ end.
+
+cast(Client, Function, Args)
+ when is_pid(Client), is_atom(Function), is_list(Args) ->
+ gen_server:cast(Client, {call, Function, Args}).
+
+%% Sends a function call but does not read the result. This is useful
+%% if you're trying to log non-oneway function calls to write-only
+%% transports like thrift_disk_log_transport.
+send_call(Client, Function, Args)
+ when is_pid(Client), is_atom(Function), is_list(Args) ->
+ gen_server:call(Client, {send_call, Function, Args}).
+
+close(Client) when is_pid(Client) ->
+ gen_server:cast(Client, close).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Service]) ->
+ {ok, #state{service = Service}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({connect, ProtocolFactory}, _From,
+ State = #state{service = Service}) ->
+ case ProtocolFactory() of
+ {ok, Protocol} ->
+ {reply, ok, State#state{protocol = Protocol,
+ seqid = 0}};
+ Error ->
+ {stop, normal, Error, State}
+ end;
+
+handle_call({call, Function, Args}, _From, State = #state{service = Service}) ->
+ Result = catch_function_exceptions(
+ fun() ->
+ ok = send_function_call(State, Function, Args),
+ receive_function_result(State, Function)
+ end,
+ Service),
+ {reply, Result, State};
+
+
+handle_call({send_call, Function, Args}, _From, State = #state{service = Service}) ->
+ Result = catch_function_exceptions(
+ fun() ->
+ send_function_call(State, Function, Args)
+ end,
+ Service),
+ {reply, Result, State}.
+
+
+%% Helper function that catches exceptions thrown by sending or receiving
+%% a function and returns the correct response for call or send_only above.
+catch_function_exceptions(Fun, Service) ->
+ try
+ Fun()
+ catch
+ throw:{return, Return} ->
+ Return;
+ error:function_clause ->
+ ST = erlang:get_stacktrace(),
+ case hd(ST) of
+ {Service, function_info, [Function, _]} ->
+ {error, {no_function, Function}};
+ _ -> throw({error, {function_clause, ST}})
+ end
+ end.
+
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast({call, Function, Args}, State = #state{service = Service,
+ protocol = Protocol,
+ seqid = SeqId}) ->
+ _Result =
+ try
+ ok = send_function_call(State, Function, Args),
+ receive_function_result(State, Function)
+ catch
+ Class:Reason ->
+ error_logger:error_msg("error ignored in handle_cast({cast,...},...): ~p:~p~n", [Class, Reason])
+ end,
+
+ {noreply, State};
+
+handle_cast(close, State=#state{protocol = Protocol}) ->
+%% error_logger:info_msg("thrift_client ~p received close", [self()]),
+ {stop,normal,State};
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(Reason, State = #state{protocol=undefined}) ->
+ ok;
+terminate(Reason, State = #state{protocol=Protocol}) ->
+ thrift_protocol:close_transport(Protocol),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+send_function_call(#state{protocol = Proto,
+ service = Service,
+ seqid = SeqId},
+ Function,
+ Args) ->
+ Params = Service:function_info(Function, params_type),
+ {struct, PList} = Params,
+ if
+ length(PList) =/= length(Args) ->
+ throw({return, {error, {bad_args, Function, Args}}});
+ true -> ok
+ end,
+
+ Begin = #protocol_message_begin{name = atom_to_list(Function),
+ type = ?tMessageType_CALL,
+ seqid = SeqId},
+ ok = thrift_protocol:write(Proto, Begin),
+ ok = thrift_protocol:write(Proto, {Params, list_to_tuple([Function | Args])}),
+ ok = thrift_protocol:write(Proto, message_end),
+ thrift_protocol:flush_transport(Proto),
+ ok.
+
+receive_function_result(State = #state{protocol = Proto,
+ service = Service},
+ Function) ->
+ ResultType = Service:function_info(Function, reply_type),
+ read_result(State, Function, ResultType).
+
+read_result(_State,
+ _Function,
+ oneway_void) ->
+ {ok, ok};
+
+read_result(State = #state{protocol = Proto,
+ seqid = SeqId},
+ Function,
+ ReplyType) ->
+ case thrift_protocol:read(Proto, message_begin) of
+ #protocol_message_begin{seqid = RetSeqId} when RetSeqId =/= SeqId ->
+ {error, {bad_seq_id, SeqId}};
+
+ #protocol_message_begin{type = ?tMessageType_EXCEPTION} ->
+ handle_application_exception(State);
+
+ #protocol_message_begin{type = ?tMessageType_REPLY} ->
+ handle_reply(State, Function, ReplyType)
+ end.
+
+handle_reply(State = #state{protocol = Proto,
+ service = Service},
+ Function,
+ ReplyType) ->
+ {struct, ExceptionFields} = Service:function_info(Function, exceptions),
+ ReplyStructDef = {struct, [{0, ReplyType}] ++ ExceptionFields},
+ {ok, Reply} = thrift_protocol:read(Proto, ReplyStructDef),
+ ReplyList = tuple_to_list(Reply),
+ true = length(ReplyList) == length(ExceptionFields) + 1,
+ ExceptionVals = tl(ReplyList),
+ Thrown = [X || X <- ExceptionVals,
+ X =/= undefined],
+ Result =
+ case Thrown of
+ [] when ReplyType == {struct, []} ->
+ {ok, ok};
+ [] ->
+ {ok, hd(ReplyList)};
+ [Exception] ->
+ {exception, Exception}
+ end,
+ ok = thrift_protocol:read(Proto, message_end),
+ Result.
+
+handle_application_exception(State = #state{protocol = Proto}) ->
+ {ok, Exception} = thrift_protocol:read(Proto,
+ ?TApplicationException_Structure),
+ ok = thrift_protocol:read(Proto, message_end),
+ XRecord = list_to_tuple(
+ ['TApplicationException' | tuple_to_list(Exception)]),
+ error_logger:error_msg("X: ~p~n", [XRecord]),
+ true = is_record(XRecord, 'TApplicationException'),
+ {exception, XRecord}.
diff --git a/lib/erl/src/thrift_disk_log_transport.erl b/lib/erl/src/thrift_disk_log_transport.erl
new file mode 100644
index 0000000..761fa30
--- /dev/null
+++ b/lib/erl/src/thrift_disk_log_transport.erl
@@ -0,0 +1,118 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+%%% Todo: this might be better off as a gen_server type of transport
+%%% that handles stuff like group commit, similar to TFileTransport
+%%% in cpp land
+-module(thrift_disk_log_transport).
+
+-behaviour(thrift_transport).
+
+%% API
+-export([new/2, new_transport_factory/2, new_transport_factory/3]).
+
+%% thrift_transport callbacks
+-export([read/2, write/2, force_flush/1, flush/1, close/1]).
+
+%% state
+-record(dl_transport, {log,
+ close_on_close = false,
+ sync_every = infinity,
+ sync_tref}).
+
+
+%% Create a transport attached to an already open log.
+%% If you'd like this transport to close the disk_log using disk_log:lclose()
+%% when the transport is closed, pass a {close_on_close, true} tuple in the
+%% Opts list.
+new(LogName, Opts) when is_atom(LogName), is_list(Opts) ->
+ State = parse_opts(Opts, #dl_transport{log = LogName}),
+
+ State2 =
+ case State#dl_transport.sync_every of
+ N when is_integer(N), N > 0 ->
+ {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, State),
+ State#dl_transport{sync_tref = TRef};
+ _ -> State
+ end,
+
+ thrift_transport:new(?MODULE, State2).
+
+
+parse_opts([], State) ->
+ State;
+parse_opts([{close_on_close, Bool} | Rest], State) when is_boolean(Bool) ->
+ State#dl_transport{close_on_close = Bool};
+parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 ->
+ State#dl_transport{sync_every = Int}.
+
+
+%%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%% disk_log_transport is write-only
+read(_State, Len) ->
+ {error, no_read_from_disk_log}.
+
+write(#dl_transport{log = Log}, Data) ->
+ disk_log:balog(Log, erlang:iolist_to_binary(Data)).
+
+force_flush(#dl_transport{log = Log}) ->
+ error_logger:info_msg("~p syncing~n", [?MODULE]),
+ disk_log:sync(Log).
+
+flush(#dl_transport{log = Log, sync_every = SE}) ->
+ case SE of
+ undefined -> % no time-based sync
+ disk_log:sync(Log);
+ _Else -> % sync will happen automagically
+ ok
+ end.
+
+
+%% On close, close the underlying log if we're configured to do so.
+close(#dl_transport{close_on_close = false}) ->
+ ok;
+close(#dl_transport{log = Log}) ->
+ disk_log:lclose(Log).
+
+
+%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+new_transport_factory(Name, ExtraLogOpts) ->
+ new_transport_factory(Name, ExtraLogOpts, [{close_on_close, true},
+ {sync_every, 500}]).
+
+new_transport_factory(Name, ExtraLogOpts, TransportOpts) ->
+ F = fun() -> factory_impl(Name, ExtraLogOpts, TransportOpts) end,
+ {ok, F}.
+
+factory_impl(Name, ExtraLogOpts, TransportOpts) ->
+ LogOpts = [{name, Name},
+ {format, external},
+ {type, wrap} |
+ ExtraLogOpts],
+ Log =
+ case disk_log:open(LogOpts) of
+ {ok, Log} ->
+ Log;
+ {repaired, Log, Info1, Info2} ->
+ error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [Log, Info1, Info2]),
+ Log
+ end,
+ new(Log, TransportOpts).
diff --git a/lib/erl/src/thrift_file_transport.erl b/lib/erl/src/thrift_file_transport.erl
new file mode 100644
index 0000000..5ac2dbe
--- /dev/null
+++ b/lib/erl/src/thrift_file_transport.erl
@@ -0,0 +1,87 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_file_transport).
+
+-behaviour(thrift_transport).
+
+-export([new_reader/1,
+ new/1,
+ new/2,
+ write/2, read/2, flush/1, close/1]).
+
+-record(t_file_transport, {device,
+ should_close = true,
+ mode = write}).
+
+%%%% CONSTRUCTION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+new_reader(Filename) ->
+ case file:open(Filename, [read, binary, {read_ahead, 1024*1024}]) of
+ {ok, IODevice} ->
+ new(IODevice, [{should_close, true}, {mode, read}]);
+ Error -> Error
+ end.
+
+new(Device) ->
+ new(Device, []).
+
+%% Device :: io_device()
+%%
+%% Device should be opened in raw and binary mode.
+new(Device, Opts) when is_list(Opts) ->
+ State = parse_opts(Opts, #t_file_transport{device = Device}),
+ thrift_transport:new(?MODULE, State).
+
+
+%% Parse options
+parse_opts([{should_close, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_opts(Rest, State#t_file_transport{should_close = Bool});
+parse_opts([{mode, Mode} | Rest], State)
+ when Mode =:= write;
+ Mode =:= read ->
+ parse_opts(Rest, State#t_file_transport{mode = Mode});
+parse_opts([], State) ->
+ State.
+
+
+%%%% TRANSPORT IMPL %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+write(#t_file_transport{device = Device, mode = write}, Data) ->
+ file:write(Device, Data);
+write(_T, _D) ->
+ {error, read_mode}.
+
+
+read(#t_file_transport{device = Device, mode = read}, Len)
+ when is_integer(Len), Len >= 0 ->
+ file:read(Device, Len);
+read(_T, _D) ->
+ {error, read_mode}.
+
+flush(#t_file_transport{device = Device, mode = write}) ->
+ file:sync(Device).
+
+close(#t_file_transport{device = Device, should_close = SC}) ->
+ case SC of
+ true ->
+ file:close(Device);
+ false ->
+ ok
+ end.
diff --git a/lib/erl/src/thrift_framed_transport.erl b/lib/erl/src/thrift_framed_transport.erl
new file mode 100644
index 0000000..01bab70
--- /dev/null
+++ b/lib/erl/src/thrift_framed_transport.erl
@@ -0,0 +1,208 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_framed_transport).
+
+-behaviour(gen_server).
+-behaviour(thrift_transport).
+
+%% API
+-export([new/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% thrift_transport callbacks
+-export([write/2, read/2, flush/1, close/1]).
+
+-record(framed_transport, {wrapped, % a thrift_transport
+ read_buffer, % iolist()
+ write_buffer % iolist()
+ }).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+new(WrappedTransport) ->
+ case gen_server:start_link(?MODULE, [WrappedTransport], []) of
+ {ok, Pid} ->
+ thrift_transport:new(?MODULE, Pid);
+ Else ->
+ Else
+ end.
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+ gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+ gen_server:call(Transport, flush).
+
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport and the wrapped transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+ gen_server:cast(Transport, close).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+ gen_server:call(Transport, {read, Len}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Wrapped]) ->
+ {ok, #framed_transport{wrapped = Wrapped,
+ read_buffer = [],
+ write_buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({write, Data}, _From, State = #framed_transport{write_buffer = WBuf}) ->
+ {reply, ok, State#framed_transport{write_buffer = [WBuf, Data]}};
+
+handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped,
+ read_buffer = RBuf}) ->
+ {RBuf1, RBuf1Size} =
+ %% if the read buffer is empty, read another frame
+ %% otherwise, just read from what's left in the buffer
+ case iolist_size(RBuf) of
+ 0 ->
+ %% read the frame length
+ {ok, <<FrameLen:32/integer-signed-big, _/binary>>} =
+ thrift_transport:read(Wrapped, 4),
+ %% then read the data
+ {ok, Bin} =
+ thrift_transport:read(Wrapped, FrameLen),
+ {Bin, erlang:byte_size(Bin)};
+ Sz ->
+ {RBuf, Sz}
+ end,
+
+ %% pull off Give bytes, return them to the user, leave the rest in the buffer
+ Give = min(RBuf1Size, Len),
+ <<Data:Give/binary, RBuf2/binary>> = iolist_to_binary(RBuf1),
+
+ Response = {ok, Data},
+ State1 = State#framed_transport{read_buffer=RBuf2},
+
+ {reply, Response, State1};
+
+handle_call(flush, _From, State) ->
+ {Response, State1} = do_flush(State),
+ {reply, Response, State1}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(close, State) ->
+ {_, State1} = do_flush(State),
+ %% Wrapped is closed by terminate/2
+ %% error_logger:info_msg("thrift_framed_transport ~p: closing", [self()]),
+ {stop, normal, State};
+handle_cast(Msg, State=#framed_transport{}) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, State = #framed_transport{wrapped=Wrapped}) ->
+ thrift_transport:close(Wrapped),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+do_flush(State = #framed_transport{write_buffer = Buffer,
+ wrapped = Wrapped}) ->
+ FrameLen = iolist_size(Buffer),
+ Data = [<<FrameLen:32/integer-signed-big>>, Buffer],
+
+ Response = thrift_transport:write(Wrapped, Data),
+
+ thrift_transport:flush(Wrapped),
+
+ State1 = State#framed_transport{write_buffer = []},
+ {Response, State1}.
+
+min(A,B) when A<B -> A;
+min(_,B) -> B.
+
diff --git a/lib/erl/src/thrift_http_transport.erl b/lib/erl/src/thrift_http_transport.erl
new file mode 100644
index 0000000..f8c1827
--- /dev/null
+++ b/lib/erl/src/thrift_http_transport.erl
@@ -0,0 +1,199 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_http_transport).
+
+-behaviour(gen_server).
+-behaviour(thrift_transport).
+
+%% API
+-export([new/2, new/3]).
+
+%% gen_server callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
+%% thrift_transport callbacks
+-export([write/2, read/2, flush/1, close/1]).
+
+-record(http_transport, {host, % string()
+ path, % string()
+ read_buffer, % iolist()
+ write_buffer, % iolist()
+ http_options, % see http(3)
+ extra_headers % [{str(), str()}, ...]
+ }).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: new() -> {ok, Transport} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+new(Host, Path) ->
+ new(Host, Path, _Options = []).
+
+%%--------------------------------------------------------------------
+%% Options include:
+%% {http_options, HttpOptions} = See http(3)
+%% {extra_headers, ExtraHeaders} = List of extra HTTP headers
+%%--------------------------------------------------------------------
+new(Host, Path, Options) ->
+ case gen_server:start_link(?MODULE, {Host, Path, Options}, []) of
+ {ok, Pid} ->
+ thrift_transport:new(?MODULE, Pid);
+ Else ->
+ Else
+ end.
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+ gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer, making a request
+%%--------------------------------------------------------------------
+flush(Transport) ->
+ gen_server:call(Transport, flush).
+
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+ gen_server:cast(Transport, close).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+ gen_server:call(Transport, {read, Len}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+init({Host, Path, Options}) ->
+ State1 = #http_transport{host = Host,
+ path = Path,
+ read_buffer = [],
+ write_buffer = [],
+ http_options = [],
+ extra_headers = []},
+ ApplyOption =
+ fun
+ ({http_options, HttpOpts}, State = #http_transport{}) ->
+ State#http_transport{http_options = HttpOpts};
+ ({extra_headers, ExtraHeaders}, State = #http_transport{}) ->
+ State#http_transport{extra_headers = ExtraHeaders};
+ (Other, #http_transport{}) ->
+ {invalid_option, Other};
+ (_, Error) ->
+ Error
+ end,
+ case lists:foldl(ApplyOption, State1, Options) of
+ State2 = #http_transport{} ->
+ {ok, State2};
+ Else ->
+ {stop, Else}
+ end.
+
+handle_call({write, Data}, _From, State = #http_transport{write_buffer = WBuf}) ->
+ {reply, ok, State#http_transport{write_buffer = [WBuf, Data]}};
+
+handle_call({read, Len}, _From, State = #http_transport{read_buffer = RBuf}) ->
+ %% Pull off Give bytes, return them to the user, leave the rest in the buffer.
+ Give = min(iolist_size(RBuf), Len),
+ case iolist_to_binary(RBuf) of
+ <<Data:Give/binary, RBuf1/binary>> ->
+ Response = {ok, Data},
+ State1 = State#http_transport{read_buffer=RBuf1},
+ {reply, Response, State1};
+ _ ->
+ {reply, {error, 'EOF'}, State}
+ end;
+
+handle_call(flush, _From, State) ->
+ {Response, State1} = do_flush(State),
+ {reply, Response, State1}.
+
+handle_cast(close, State) ->
+ {_, State1} = do_flush(State),
+ {stop, normal, State1};
+
+handle_cast(_Msg, State=#http_transport{}) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+do_flush(State = #http_transport{host = Host,
+ path = Path,
+ read_buffer = Rbuf,
+ write_buffer = Wbuf,
+ http_options = HttpOptions,
+ extra_headers = ExtraHeaders}) ->
+ case iolist_to_binary(Wbuf) of
+ <<>> ->
+ %% Don't bother flushing empty buffers.
+ {ok, State};
+ WBinary ->
+ {ok, {{_Version, 200, _ReasonPhrase}, _Headers, Body}} =
+ http:request(post,
+ {"http://" ++ Host ++ Path,
+ [{"User-Agent", "Erlang/thrift_http_transport"} | ExtraHeaders],
+ "application/x-thrift",
+ WBinary},
+ HttpOptions,
+ [{body_format, binary}]),
+
+ State1 = State#http_transport{read_buffer = [Rbuf, Body],
+ write_buffer = []},
+ {ok, State1}
+ end.
+
+min(A,B) when A<B -> A;
+min(_,B) -> B.
diff --git a/lib/erl/src/thrift_memory_buffer.erl b/lib/erl/src/thrift_memory_buffer.erl
new file mode 100644
index 0000000..b4f607a
--- /dev/null
+++ b/lib/erl/src/thrift_memory_buffer.erl
@@ -0,0 +1,164 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_memory_buffer).
+
+-behaviour(gen_server).
+-behaviour(thrift_transport).
+
+%% API
+-export([new/0, new_transport_factory/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% thrift_transport callbacks
+-export([write/2, read/2, flush/1, close/1]).
+
+-record(memory_buffer, {buffer}).
+
+%%====================================================================
+%% API
+%%====================================================================
+new() ->
+ case gen_server:start_link(?MODULE, [], []) of
+ {ok, Pid} ->
+ thrift_transport:new(?MODULE, Pid);
+ Else ->
+ Else
+ end.
+
+new_transport_factory() ->
+ {ok, fun() -> new() end}.
+
+%%--------------------------------------------------------------------
+%% Function: write(Transport, Data) -> ok
+%%
+%% Data = iolist()
+%%
+%% Description: Writes data into the buffer
+%%--------------------------------------------------------------------
+write(Transport, Data) ->
+ gen_server:call(Transport, {write, Data}).
+
+%%--------------------------------------------------------------------
+%% Function: flush(Transport) -> ok
+%%
+%% Description: Flushes the buffer through to the wrapped transport
+%%--------------------------------------------------------------------
+flush(Transport) ->
+ gen_server:call(Transport, flush).
+
+%%--------------------------------------------------------------------
+%% Function: close(Transport) -> ok
+%%
+%% Description: Closes the transport and the wrapped transport
+%%--------------------------------------------------------------------
+close(Transport) ->
+ gen_server:cast(Transport, close).
+
+%%--------------------------------------------------------------------
+%% Function: Read(Transport, Len) -> {ok, Data}
+%%
+%% Data = binary()
+%%
+%% Description: Reads data through from the wrapped transoprt
+%%--------------------------------------------------------------------
+read(Transport, Len) when is_integer(Len) ->
+ gen_server:call(Transport, {read, Len}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([]) ->
+ {ok, #memory_buffer{buffer = []}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({write, Data}, _From, State = #memory_buffer{buffer = Buf}) ->
+ {reply, ok, State#memory_buffer{buffer = [Buf, Data]}};
+
+handle_call({read, Len}, _From, State = #memory_buffer{buffer = Buf}) ->
+ Binary = iolist_to_binary(Buf),
+ Give = min(iolist_size(Binary), Len),
+ {Result, Remaining} = split_binary(Binary, Give),
+ {reply, {ok, Result}, State#memory_buffer{buffer = Remaining}};
+
+handle_call(flush, _From, State) ->
+ {reply, ok, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(close, State) ->
+ {stop, normal, State};
+handle_cast(Msg, State=#memory_buffer{}) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+min(A,B) when A<B -> A;
+min(_,B) -> B.
+
diff --git a/lib/erl/src/thrift_processor.erl b/lib/erl/src/thrift_processor.erl
new file mode 100644
index 0000000..e26fb33
--- /dev/null
+++ b/lib/erl/src/thrift_processor.erl
@@ -0,0 +1,188 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_processor).
+
+-export([init/1]).
+
+-include("thrift_constants.hrl").
+-include("thrift_protocol.hrl").
+
+-record(thrift_processor, {handler, in_protocol, out_protocol, service}).
+
+init({Server, ProtoGen, Service, Handler}) when is_function(ProtoGen, 0) ->
+ {ok, IProt, OProt} = ProtoGen(),
+ loop(#thrift_processor{in_protocol = IProt,
+ out_protocol = OProt,
+ service = Service,
+ handler = Handler}).
+
+loop(State = #thrift_processor{in_protocol = IProto,
+ out_protocol = OProto}) ->
+ case thrift_protocol:read(IProto, message_begin) of
+ #protocol_message_begin{name = Function,
+ type = ?tMessageType_CALL} ->
+ ok = handle_function(State, list_to_atom(Function)),
+ loop(State);
+ #protocol_message_begin{name = Function,
+ type = ?tMessageType_ONEWAY} ->
+ ok = handle_function(State, list_to_atom(Function)),
+ loop(State);
+ {error, timeout} ->
+ thrift_protocol:close_transport(OProto),
+ ok;
+ {error, closed} ->
+ %% error_logger:info_msg("Client disconnected~n"),
+ thrift_protocol:close_transport(OProto),
+ exit(shutdown)
+ end.
+
+handle_function(State=#thrift_processor{in_protocol = IProto,
+ out_protocol = OProto,
+ handler = Handler,
+ service = Service},
+ Function) ->
+ InParams = Service:function_info(Function, params_type),
+
+ {ok, Params} = thrift_protocol:read(IProto, InParams),
+
+ try
+ Result = Handler:handle_function(Function, Params),
+ %% {Micro, Result} = better_timer(Handler, handle_function, [Function, Params]),
+ %% error_logger:info_msg("Processed ~p(~p) in ~.4fms~n",
+ %% [Function, Params, Micro/1000.0]),
+ handle_success(State, Function, Result)
+ catch
+ Type:Data ->
+ handle_function_catch(State, Function, Type, Data)
+ end,
+ after_reply(OProto).
+
+handle_function_catch(State = #thrift_processor{service = Service},
+ Function, ErrType, ErrData) ->
+ IsOneway = Service:function_info(Function, reply_type) =:= oneway_void,
+
+ case {ErrType, ErrData} of
+ _ when IsOneway ->
+ Stack = erlang:get_stacktrace(),
+ error_logger:warning_msg(
+ "oneway void ~p threw error which must be ignored: ~p",
+ [Function, {ErrType, ErrData, Stack}]),
+ ok;
+
+ {throw, Exception} when is_tuple(Exception), size(Exception) > 0 ->
+ error_logger:warning_msg("~p threw exception: ~p~n", [Function, Exception]),
+ handle_exception(State, Function, Exception),
+ ok; % we still want to accept more requests from this client
+
+ {error, Error} ->
+ ok = handle_error(State, Function, Error)
+ end.
+
+handle_success(State = #thrift_processor{out_protocol = OProto,
+ service = Service},
+ Function,
+ Result) ->
+ ReplyType = Service:function_info(Function, reply_type),
+ StructName = atom_to_list(Function) ++ "_result",
+
+ ok = case Result of
+ {reply, ReplyData} ->
+ Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}},
+ send_reply(OProto, Function, ?tMessageType_REPLY, Reply);
+
+ ok when ReplyType == {struct, []} ->
+ send_reply(OProto, Function, ?tMessageType_REPLY, {ReplyType, {StructName}});
+
+ ok when ReplyType == oneway_void ->
+ %% no reply for oneway void
+ ok
+ end.
+
+handle_exception(State = #thrift_processor{out_protocol = OProto,
+ service = Service},
+ Function,
+ Exception) ->
+ ExceptionType = element(1, Exception),
+ %% Fetch a structure like {struct, [{-2, {struct, {Module, Type}}},
+ %% {-3, {struct, {Module, Type}}}]}
+
+ ReplySpec = Service:function_info(Function, exceptions),
+ {struct, XInfo} = ReplySpec,
+
+ true = is_list(XInfo),
+
+ %% Assuming we had a type1 exception, we'd get: [undefined, Exception, undefined]
+ %% e.g.: [{-1, type0}, {-2, type1}, {-3, type2}]
+ ExceptionList = [case Type of
+ ExceptionType -> Exception;
+ _ -> undefined
+ end
+ || {_Fid, {struct, {_Module, Type}}} <- XInfo],
+
+ ExceptionTuple = list_to_tuple([Function | ExceptionList]),
+
+ % Make sure we got at least one defined
+ case lists:all(fun(X) -> X =:= undefined end, ExceptionList) of
+ true ->
+ ok = handle_unknown_exception(State, Function, Exception);
+ false ->
+ ok = send_reply(OProto, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple})
+ end.
+
+%%
+%% Called when an exception has been explicitly thrown by the service, but it was
+%% not one of the exceptions that was defined for the function.
+%%
+handle_unknown_exception(State, Function, Exception) ->
+ handle_error(State, Function, {exception_not_declared_as_thrown,
+ Exception}).
+
+handle_error(#thrift_processor{out_protocol = OProto}, Function, Error) ->
+ Stack = erlang:get_stacktrace(),
+ error_logger:error_msg("~p had an error: ~p~n", [Function, {Error, Stack}]),
+
+ Message =
+ case application:get_env(thrift, exceptions_include_traces) of
+ {ok, true} ->
+ lists:flatten(io_lib:format("An error occurred: ~p~n",
+ [{Error, Stack}]));
+ _ ->
+ "An unknown handler error occurred."
+ end,
+ Reply = {?TApplicationException_Structure,
+ #'TApplicationException'{
+ message = Message,
+ type = ?TApplicationException_UNKNOWN}},
+ send_reply(OProto, Function, ?tMessageType_EXCEPTION, Reply).
+
+send_reply(OProto, Function, ReplyMessageType, Reply) ->
+ ok = thrift_protocol:write(OProto, #protocol_message_begin{
+ name = atom_to_list(Function),
+ type = ReplyMessageType,
+ seqid = 0}),
+ ok = thrift_protocol:write(OProto, Reply),
+ ok = thrift_protocol:write(OProto, message_end),
+ ok = thrift_protocol:flush_transport(OProto),
+ ok.
+
+after_reply(OProto) ->
+ ok = thrift_protocol:flush_transport(OProto)
+ %% ok = thrift_protocol:close_transport(OProto)
+ .
diff --git a/lib/erl/src/thrift_protocol.erl b/lib/erl/src/thrift_protocol.erl
new file mode 100644
index 0000000..1bfb0a4
--- /dev/null
+++ b/lib/erl/src/thrift_protocol.erl
@@ -0,0 +1,356 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_protocol).
+
+-export([new/2,
+ write/2,
+ read/2,
+ read/3,
+ skip/2,
+ flush_transport/1,
+ close_transport/1,
+ typeid_to_atom/1
+ ]).
+
+-export([behaviour_info/1]).
+
+-include("thrift_constants.hrl").
+-include("thrift_protocol.hrl").
+
+-record(protocol, {module, data}).
+
+behaviour_info(callbacks) ->
+ [
+ {read, 2},
+ {write, 2},
+ {flush_transport, 1},
+ {close_transport, 1}
+ ];
+behaviour_info(_Else) -> undefined.
+
+new(Module, Data) when is_atom(Module) ->
+ {ok, #protocol{module = Module,
+ data = Data}}.
+
+flush_transport(#protocol{module = Module,
+ data = Data}) ->
+ Module:flush_transport(Data).
+
+close_transport(#protocol{module = Module,
+ data = Data}) ->
+ Module:close_transport(Data).
+
+typeid_to_atom(?tType_STOP) -> field_stop;
+typeid_to_atom(?tType_VOID) -> void;
+typeid_to_atom(?tType_BOOL) -> bool;
+typeid_to_atom(?tType_BYTE) -> byte;
+typeid_to_atom(?tType_DOUBLE) -> double;
+typeid_to_atom(?tType_I16) -> i16;
+typeid_to_atom(?tType_I32) -> i32;
+typeid_to_atom(?tType_I64) -> i64;
+typeid_to_atom(?tType_STRING) -> string;
+typeid_to_atom(?tType_STRUCT) -> struct;
+typeid_to_atom(?tType_MAP) -> map;
+typeid_to_atom(?tType_SET) -> set;
+typeid_to_atom(?tType_LIST) -> list.
+
+term_to_typeid(void) -> ?tType_VOID;
+term_to_typeid(bool) -> ?tType_BOOL;
+term_to_typeid(byte) -> ?tType_BYTE;
+term_to_typeid(double) -> ?tType_DOUBLE;
+term_to_typeid(i16) -> ?tType_I16;
+term_to_typeid(i32) -> ?tType_I32;
+term_to_typeid(i64) -> ?tType_I64;
+term_to_typeid(string) -> ?tType_STRING;
+term_to_typeid({struct, _}) -> ?tType_STRUCT;
+term_to_typeid({map, _, _}) -> ?tType_MAP;
+term_to_typeid({set, _}) -> ?tType_SET;
+term_to_typeid({list, _}) -> ?tType_LIST.
+
+%% Structure is like:
+%% [{Fid, Type}, ...]
+read(IProto, {struct, Structure}, Tag)
+ when is_list(Structure), is_atom(Tag) ->
+
+ % If we want a tagged tuple, we need to offset all the tuple indices
+ % by 1 to avoid overwriting the tag.
+ Offset = if Tag =/= undefined -> 1; true -> 0 end,
+ IndexList = case length(Structure) of
+ N when N > 0 -> lists:seq(1 + Offset, N + Offset);
+ _ -> []
+ end,
+
+ SWithIndices = [{Fid, {Type, Index}} ||
+ {{Fid, Type}, Index} <-
+ lists:zip(Structure, IndexList)],
+ % Fid -> {Type, Index}
+ SDict = dict:from_list(SWithIndices),
+
+ ok = read(IProto, struct_begin),
+ RTuple0 = erlang:make_tuple(length(Structure) + Offset, undefined),
+ RTuple1 = if Tag =/= undefined -> setelement(1, RTuple0, Tag);
+ true -> RTuple0
+ end,
+
+ RTuple2 = read_struct_loop(IProto, SDict, RTuple1),
+ {ok, RTuple2}.
+
+read(IProto, {struct, {Module, StructureName}}) when is_atom(Module),
+ is_atom(StructureName) ->
+ read(IProto, Module:struct_info(StructureName), StructureName);
+
+read(IProto, S={struct, Structure}) when is_list(Structure) ->
+ read(IProto, S, undefined);
+
+read(IProto, {list, Type}) ->
+ #protocol_list_begin{etype = EType, size = Size} =
+ read(IProto, list_begin),
+ List = [Result || {ok, Result} <-
+ [read(IProto, Type) || _X <- lists:duplicate(Size, 0)]],
+ ok = read(IProto, list_end),
+ {ok, List};
+
+read(IProto, {map, KeyType, ValType}) ->
+ #protocol_map_begin{size = Size} =
+ read(IProto, map_begin),
+
+ List = [{Key, Val} || {{ok, Key}, {ok, Val}} <-
+ [{read(IProto, KeyType),
+ read(IProto, ValType)} || _X <- lists:duplicate(Size, 0)]],
+ ok = read(IProto, map_end),
+ {ok, dict:from_list(List)};
+
+read(IProto, {set, Type}) ->
+ #protocol_set_begin{etype = _EType,
+ size = Size} =
+ read(IProto, set_begin),
+ List = [Result || {ok, Result} <-
+ [read(IProto, Type) || _X <- lists:duplicate(Size, 0)]],
+ ok = read(IProto, set_end),
+ {ok, sets:from_list(List)};
+
+read(#protocol{module = Module,
+ data = ModuleData}, ProtocolType) ->
+ Module:read(ModuleData, ProtocolType).
+
+read_struct_loop(IProto, SDict, RTuple) ->
+ #protocol_field_begin{type = FType, id = Fid, name = Name} =
+ thrift_protocol:read(IProto, field_begin),
+ case {FType, Fid} of
+ {?tType_STOP, _} ->
+ RTuple;
+ _Else ->
+ case dict:find(Fid, SDict) of
+ {ok, {Type, Index}} ->
+ case term_to_typeid(Type) of
+ FType ->
+ {ok, Val} = read(IProto, Type),
+ thrift_protocol:read(IProto, field_end),
+ NewRTuple = setelement(Index, RTuple, Val),
+ read_struct_loop(IProto, SDict, NewRTuple);
+ Expected ->
+ error_logger:info_msg(
+ "Skipping field ~p with wrong type (~p != ~p)~n",
+ [Fid, FType, Expected]),
+ skip_field(FType, IProto, SDict, RTuple)
+ end;
+ _Else2 ->
+ error_logger:info_msg("Skipping field ~p with unknown fid~n", [Fid]),
+ skip_field(FType, IProto, SDict, RTuple)
+ end
+ end.
+
+skip_field(FType, IProto, SDict, RTuple) ->
+ FTypeAtom = thrift_protocol:typeid_to_atom(FType),
+ thrift_protocol:skip(IProto, FTypeAtom),
+ read(IProto, field_end),
+ read_struct_loop(IProto, SDict, RTuple).
+
+
+skip(Proto, struct) ->
+ ok = read(Proto, struct_begin),
+ ok = skip_struct_loop(Proto),
+ ok = read(Proto, struct_end);
+
+skip(Proto, map) ->
+ Map = read(Proto, map_begin),
+ ok = skip_map_loop(Proto, Map),
+ ok = read(Proto, map_end);
+
+skip(Proto, set) ->
+ Set = read(Proto, set_begin),
+ ok = skip_set_loop(Proto, Set),
+ ok = read(Proto, set_end);
+
+skip(Proto, list) ->
+ List = read(Proto, list_begin),
+ ok = skip_list_loop(Proto, List),
+ ok = read(Proto, list_end);
+
+skip(Proto, Type) when is_atom(Type) ->
+ _Ignore = read(Proto, Type),
+ ok.
+
+
+skip_struct_loop(Proto) ->
+ #protocol_field_begin{type = Type} = read(Proto, field_begin),
+ case Type of
+ ?tType_STOP ->
+ ok;
+ _Else ->
+ skip(Proto, Type),
+ ok = read(Proto, field_end),
+ skip_struct_loop(Proto)
+ end.
+
+skip_map_loop(Proto, Map = #protocol_map_begin{ktype = Ktype,
+ vtype = Vtype,
+ size = Size}) ->
+ case Size of
+ N when N > 0 ->
+ skip(Proto, Ktype),
+ skip(Proto, Vtype),
+ skip_map_loop(Proto,
+ Map#protocol_map_begin{size = Size - 1});
+ 0 -> ok
+ end.
+
+skip_set_loop(Proto, Map = #protocol_set_begin{etype = Etype,
+ size = Size}) ->
+ case Size of
+ N when N > 0 ->
+ skip(Proto, Etype),
+ skip_set_loop(Proto,
+ Map#protocol_set_begin{size = Size - 1});
+ 0 -> ok
+ end.
+
+skip_list_loop(Proto, Map = #protocol_list_begin{etype = Etype,
+ size = Size}) ->
+ case Size of
+ N when N > 0 ->
+ skip(Proto, Etype),
+ skip_list_loop(Proto,
+ Map#protocol_list_begin{size = Size - 1});
+ 0 -> ok
+ end.
+
+
+%%--------------------------------------------------------------------
+%% Function: write(OProto, {Type, Data}) -> ok
+%%
+%% Type = {struct, StructDef} |
+%% {list, Type} |
+%% {map, KeyType, ValType} |
+%% {set, Type} |
+%% BaseType
+%%
+%% Data =
+%% tuple() -- for struct
+%% | list() -- for list
+%% | dictionary() -- for map
+%% | set() -- for set
+%% | term() -- for base types
+%%
+%% Description:
+%%--------------------------------------------------------------------
+write(Proto, {{struct, StructDef}, Data})
+ when is_list(StructDef), is_tuple(Data), length(StructDef) == size(Data) - 1 ->
+
+ [StructName | Elems] = tuple_to_list(Data),
+ ok = write(Proto, #protocol_struct_begin{name = StructName}),
+ ok = struct_write_loop(Proto, StructDef, Elems),
+ ok = write(Proto, struct_end),
+ ok;
+
+write(Proto, {{struct, {Module, StructureName}}, Data})
+ when is_atom(Module),
+ is_atom(StructureName),
+ element(1, Data) =:= StructureName ->
+ StructType = Module:struct_info(StructureName),
+ write(Proto, {Module:struct_info(StructureName), Data});
+
+write(Proto, {{list, Type}, Data})
+ when is_list(Data) ->
+ ok = write(Proto,
+ #protocol_list_begin{
+ etype = term_to_typeid(Type),
+ size = length(Data)
+ }),
+ lists:foreach(fun(Elem) ->
+ ok = write(Proto, {Type, Elem})
+ end,
+ Data),
+ ok = write(Proto, list_end),
+ ok;
+
+write(Proto, {{map, KeyType, ValType}, Data}) ->
+ ok = write(Proto,
+ #protocol_map_begin{
+ ktype = term_to_typeid(KeyType),
+ vtype = term_to_typeid(ValType),
+ size = dict:size(Data)
+ }),
+ dict:fold(fun(KeyData, ValData, _Acc) ->
+ ok = write(Proto, {KeyType, KeyData}),
+ ok = write(Proto, {ValType, ValData})
+ end,
+ _AccO = ok,
+ Data),
+ ok = write(Proto, map_end),
+ ok;
+
+write(Proto, {{set, Type}, Data}) ->
+ true = sets:is_set(Data),
+ ok = write(Proto,
+ #protocol_set_begin{
+ etype = term_to_typeid(Type),
+ size = sets:size(Data)
+ }),
+ sets:fold(fun(Elem, _Acc) ->
+ ok = write(Proto, {Type, Elem})
+ end,
+ _Acc0 = ok,
+ Data),
+ ok = write(Proto, set_end),
+ ok;
+
+write(#protocol{module = Module,
+ data = ModuleData}, Data) ->
+ Module:write(ModuleData, Data).
+
+struct_write_loop(Proto, [{Fid, Type} | RestStructDef], [Data | RestData]) ->
+ case Data of
+ undefined ->
+ % null fields are skipped in response
+ skip;
+ _ ->
+ ok = write(Proto,
+ #protocol_field_begin{
+ type = term_to_typeid(Type),
+ id = Fid
+ }),
+ ok = write(Proto, {Type, Data}),
+ ok = write(Proto, field_end)
+ end,
+ struct_write_loop(Proto, RestStructDef, RestData);
+struct_write_loop(Proto, [], []) ->
+ ok = write(Proto, field_stop),
+ ok.
diff --git a/lib/erl/src/thrift_server.erl b/lib/erl/src/thrift_server.erl
new file mode 100644
index 0000000..5d0012b
--- /dev/null
+++ b/lib/erl/src/thrift_server.erl
@@ -0,0 +1,183 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_server).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/3, stop/1, take_socket/2]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {listen_socket, acceptor_ref, service, handler}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link(Port, Service, HandlerModule) when is_integer(Port), is_atom(HandlerModule) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, {Port, Service, HandlerModule}, []).
+
+%%--------------------------------------------------------------------
+%% Function: stop(Pid) -> ok, {error, Reason}
+%% Description: Stops the server.
+%%--------------------------------------------------------------------
+stop(Pid) when is_pid(Pid) ->
+ gen_server:call(Pid, stop).
+
+
+take_socket(Server, Socket) ->
+ gen_server:call(Server, {take_socket, Socket}).
+
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init({Port, Service, Handler}) ->
+ {ok, Socket} = gen_tcp:listen(Port,
+ [binary,
+ {packet, 0},
+ {active, false},
+ {nodelay, true},
+ {reuseaddr, true}]),
+ {ok, Ref} = prim_inet:async_accept(Socket, -1),
+ {ok, #state{listen_socket = Socket,
+ acceptor_ref = Ref,
+ service = Service,
+ handler = Handler}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call(stop, _From, State) ->
+ {stop, stopped, ok, State};
+
+handle_call({take_socket, Socket}, {FromPid, _Tag}, State) ->
+ Result = gen_tcp:controlling_process(Socket, FromPid),
+ {reply, Result, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info({inet_async, ListenSocket, Ref, {ok, ClientSocket}},
+ State = #state{listen_socket = ListenSocket,
+ acceptor_ref = Ref,
+ service = Service,
+ handler = Handler}) ->
+ case set_sockopt(ListenSocket, ClientSocket) of
+ ok ->
+ %% New client connected - start processor
+ start_processor(ClientSocket, Service, Handler),
+ {ok, NewRef} = prim_inet:async_accept(ListenSocket, -1),
+ {noreply, State#state{acceptor_ref = NewRef}};
+ {error, Reason} ->
+ error_logger:error_msg("Couldn't set socket opts: ~p~n",
+ [Reason]),
+ {stop, Reason, State}
+ end;
+
+handle_info({inet_async, ListenSocket, Ref, Error}, State) ->
+ error_logger:error_msg("Error in acceptor: ~p~n", [Error]),
+ {stop, Error, State};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+set_sockopt(ListenSocket, ClientSocket) ->
+ true = inet_db:register_socket(ClientSocket, inet_tcp),
+ case prim_inet:getopts(ListenSocket,
+ [active, nodelay, keepalive, delay_send, priority, tos]) of
+ {ok, Opts} ->
+ case prim_inet:setopts(ClientSocket, Opts) of
+ ok -> ok;
+ Error -> gen_tcp:close(ClientSocket),
+ Error
+ end;
+ Error ->
+ gen_tcp:close(ClientSocket),
+ Error
+ end.
+
+start_processor(Socket, Service, Handler) ->
+ Server = self(),
+
+ ProtoGen = fun() ->
+ % Become the controlling process
+ ok = take_socket(Server, Socket),
+ {ok, SocketTransport} = thrift_socket_transport:new(Socket),
+ {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport),
+ {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport),
+ {ok, Protocol, Protocol}
+ end,
+
+ spawn(thrift_processor, init, [{Server, ProtoGen, Service, Handler}]).
diff --git a/lib/erl/src/thrift_service.erl b/lib/erl/src/thrift_service.erl
new file mode 100644
index 0000000..2ed7b57
--- /dev/null
+++ b/lib/erl/src/thrift_service.erl
@@ -0,0 +1,25 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_service).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{function_info, 2}].
diff --git a/lib/erl/src/thrift_socket_server.erl b/lib/erl/src/thrift_socket_server.erl
new file mode 100644
index 0000000..62bdfda
--- /dev/null
+++ b/lib/erl/src/thrift_socket_server.erl
@@ -0,0 +1,249 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_socket_server).
+
+-behaviour(gen_server).
+
+-export([start/1, stop/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, terminate/2, code_change/3,
+ handle_info/2]).
+
+-export([acceptor_loop/1]).
+
+-record(thrift_socket_server,
+ {port,
+ service,
+ handler,
+ name,
+ max=2048,
+ ip=any,
+ listen=null,
+ acceptor=null,
+ socket_opts=[{recv_timeout, 500}]
+ }).
+
+start(State=#thrift_socket_server{}) ->
+ start_server(State);
+start(Options) ->
+ start(parse_options(Options)).
+
+stop(Name) when is_atom(Name) ->
+ gen_server:cast(Name, stop);
+stop(Pid) when is_pid(Pid) ->
+ gen_server:cast(Pid, stop);
+stop({local, Name}) ->
+ stop(Name);
+stop({global, Name}) ->
+ stop(Name);
+stop(Options) ->
+ State = parse_options(Options),
+ stop(State#thrift_socket_server.name).
+
+%% Internal API
+
+parse_options(Options) ->
+ parse_options(Options, #thrift_socket_server{}).
+
+parse_options([], State) ->
+ State;
+parse_options([{name, L} | Rest], State) when is_list(L) ->
+ Name = {local, list_to_atom(L)},
+ parse_options(Rest, State#thrift_socket_server{name=Name});
+parse_options([{name, A} | Rest], State) when is_atom(A) ->
+ Name = {local, A},
+ parse_options(Rest, State#thrift_socket_server{name=Name});
+parse_options([{name, Name} | Rest], State) ->
+ parse_options(Rest, State#thrift_socket_server{name=Name});
+parse_options([{port, L} | Rest], State) when is_list(L) ->
+ Port = list_to_integer(L),
+ parse_options(Rest, State#thrift_socket_server{port=Port});
+parse_options([{port, Port} | Rest], State) ->
+ parse_options(Rest, State#thrift_socket_server{port=Port});
+parse_options([{ip, Ip} | Rest], State) ->
+ ParsedIp = case Ip of
+ any ->
+ any;
+ Ip when is_tuple(Ip) ->
+ Ip;
+ Ip when is_list(Ip) ->
+ {ok, IpTuple} = inet_parse:address(Ip),
+ IpTuple
+ end,
+ parse_options(Rest, State#thrift_socket_server{ip=ParsedIp});
+parse_options([{socket_opts, L} | Rest], State) when is_list(L), length(L) > 0 ->
+ parse_options(Rest, State#thrift_socket_server{socket_opts=L});
+parse_options([{handler, Handler} | Rest], State) ->
+ parse_options(Rest, State#thrift_socket_server{handler=Handler});
+parse_options([{service, Service} | Rest], State) ->
+ parse_options(Rest, State#thrift_socket_server{service=Service});
+parse_options([{max, Max} | Rest], State) ->
+ MaxInt = case Max of
+ Max when is_list(Max) ->
+ list_to_integer(Max);
+ Max when is_integer(Max) ->
+ Max
+ end,
+ parse_options(Rest, State#thrift_socket_server{max=MaxInt}).
+
+start_server(State=#thrift_socket_server{name=Name}) ->
+ case Name of
+ undefined ->
+ gen_server:start_link(?MODULE, State, []);
+ _ ->
+ gen_server:start_link(Name, ?MODULE, State, [])
+ end.
+
+init(State=#thrift_socket_server{ip=Ip, port=Port}) ->
+ process_flag(trap_exit, true),
+ BaseOpts = [binary,
+ {reuseaddr, true},
+ {packet, 0},
+ {backlog, 4096},
+ {recbuf, 8192},
+ {active, false}],
+ Opts = case Ip of
+ any ->
+ BaseOpts;
+ Ip ->
+ [{ip, Ip} | BaseOpts]
+ end,
+ case gen_tcp_listen(Port, Opts, State) of
+ {stop, eacces} ->
+ %% fdsrv module allows another shot to bind
+ %% ports which require root access
+ case Port < 1024 of
+ true ->
+ case fdsrv:start() of
+ {ok, _} ->
+ case fdsrv:bind_socket(tcp, Port) of
+ {ok, Fd} ->
+ gen_tcp_listen(Port, [{fd, Fd} | Opts], State);
+ _ ->
+ {stop, fdsrv_bind_failed}
+ end;
+ _ ->
+ {stop, fdsrv_start_failed}
+ end;
+ false ->
+ {stop, eacces}
+ end;
+ Other ->
+ error_logger:info_msg("thrift service listening on port ~p", [Port]),
+ Other
+ end.
+
+gen_tcp_listen(Port, Opts, State) ->
+ case gen_tcp:listen(Port, Opts) of
+ {ok, Listen} ->
+ {ok, ListenPort} = inet:port(Listen),
+ {ok, new_acceptor(State#thrift_socket_server{listen=Listen,
+ port=ListenPort})};
+ {error, Reason} ->
+ {stop, Reason}
+ end.
+
+new_acceptor(State=#thrift_socket_server{max=0}) ->
+ error_logger:error_msg("Not accepting new connections"),
+ State#thrift_socket_server{acceptor=null};
+new_acceptor(State=#thrift_socket_server{acceptor=OldPid, listen=Listen,
+ service=Service, handler=Handler,
+ socket_opts=Opts
+ }) ->
+ Pid = proc_lib:spawn_link(?MODULE, acceptor_loop,
+ [{self(), Listen, Service, Handler, Opts}]),
+%% error_logger:info_msg("Spawning new acceptor: ~p => ~p", [OldPid, Pid]),
+ State#thrift_socket_server{acceptor=Pid}.
+
+acceptor_loop({Server, Listen, Service, Handler, SocketOpts})
+ when is_pid(Server), is_list(SocketOpts) ->
+ case catch gen_tcp:accept(Listen) of % infinite timeout
+ {ok, Socket} ->
+ gen_server:cast(Server, {accepted, self()}),
+ ProtoGen = fun() ->
+ {ok, SocketTransport} = thrift_socket_transport:new(Socket, SocketOpts),
+ {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport),
+ {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport),
+ {ok, IProt=Protocol, OProt=Protocol}
+ end,
+ thrift_processor:init({Server, ProtoGen, Service, Handler});
+ {error, closed} ->
+ exit({error, closed});
+ Other ->
+ error_logger:error_report(
+ [{application, thrift},
+ "Accept failed error",
+ lists:flatten(io_lib:format("~p", [Other]))]),
+ exit({error, accept_failed})
+ end.
+
+handle_call({get, port}, _From, State=#thrift_socket_server{port=Port}) ->
+ {reply, Port, State};
+handle_call(_Message, _From, State) ->
+ Res = error,
+ {reply, Res, State}.
+
+handle_cast({accepted, Pid},
+ State=#thrift_socket_server{acceptor=Pid, max=Max}) ->
+ % io:format("accepted ~p~n", [Pid]),
+ State1 = State#thrift_socket_server{max=Max - 1},
+ {noreply, new_acceptor(State1)};
+handle_cast(stop, State) ->
+ {stop, normal, State}.
+
+terminate(_Reason, #thrift_socket_server{listen=Listen, port=Port}) ->
+ gen_tcp:close(Listen),
+ case Port < 1024 of
+ true ->
+ catch fdsrv:stop(),
+ ok;
+ false ->
+ ok
+ end.
+
+code_change(_OldVsn, State, _Extra) ->
+ State.
+
+handle_info({'EXIT', Pid, normal},
+ State=#thrift_socket_server{acceptor=Pid}) ->
+ {noreply, new_acceptor(State)};
+handle_info({'EXIT', Pid, Reason},
+ State=#thrift_socket_server{acceptor=Pid}) ->
+ error_logger:error_report({?MODULE, ?LINE,
+ {acceptor_error, Reason}}),
+ timer:sleep(100),
+ {noreply, new_acceptor(State)};
+handle_info({'EXIT', _LoopPid, Reason},
+ State=#thrift_socket_server{acceptor=Pid, max=Max}) ->
+ case Reason of
+ normal -> ok;
+ shutdown -> ok;
+ _ -> error_logger:error_report({?MODULE, ?LINE,
+ {child_error, Reason, erlang:get_stacktrace()}})
+ end,
+ State1 = State#thrift_socket_server{max=Max + 1},
+ State2 = case Pid of
+ null -> new_acceptor(State1);
+ _ -> State1
+ end,
+ {noreply, State2};
+handle_info(Info, State) ->
+ error_logger:info_report([{'INFO', Info}, {'State', State}]),
+ {noreply, State}.
diff --git a/lib/erl/src/thrift_socket_transport.erl b/lib/erl/src/thrift_socket_transport.erl
new file mode 100644
index 0000000..fcd6944
--- /dev/null
+++ b/lib/erl/src/thrift_socket_transport.erl
@@ -0,0 +1,119 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_socket_transport).
+
+-behaviour(thrift_transport).
+
+-export([new/1,
+ new/2,
+ write/2, read/2, flush/1, close/1,
+
+ new_transport_factory/3]).
+
+-record(data, {socket,
+ recv_timeout=infinity}).
+
+new(Socket) ->
+ new(Socket, []).
+
+new(Socket, Opts) when is_list(Opts) ->
+ State =
+ case lists:keysearch(recv_timeout, 1, Opts) of
+ {value, {recv_timeout, Timeout}}
+ when is_integer(Timeout), Timeout > 0 ->
+ #data{socket=Socket, recv_timeout=Timeout};
+ _ ->
+ #data{socket=Socket}
+ end,
+ thrift_transport:new(?MODULE, State).
+
+%% Data :: iolist()
+write(#data{socket = Socket}, Data) ->
+ gen_tcp:send(Socket, Data).
+
+read(#data{socket=Socket, recv_timeout=Timeout}, Len)
+ when is_integer(Len), Len >= 0 ->
+ case gen_tcp:recv(Socket, Len, Timeout) of
+ Err = {error, timeout} ->
+ error_logger:info_msg("read timeout: peer conn ~p", [inet:peername(Socket)]),
+ gen_tcp:close(Socket),
+ Err;
+ Data -> Data
+ end.
+
+%% We can't really flush - everything is flushed when we write
+flush(_) ->
+ ok.
+
+close(#data{socket = Socket}) ->
+ gen_tcp:close(Socket).
+
+
+%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+
+%% The following "local" record is filled in by parse_factory_options/2
+%% below. These options can be passed to new_protocol_factory/3 in a
+%% proplists-style option list. They're parsed like this so it is an O(n)
+%% operation instead of O(n^2)
+-record(factory_opts, {connect_timeout = infinity,
+ sockopts = [],
+ framed = false}).
+
+parse_factory_options([], Opts) ->
+ Opts;
+parse_factory_options([{framed, Bool} | Rest], Opts) when is_boolean(Bool) ->
+ parse_factory_options(Rest, Opts#factory_opts{framed=Bool});
+parse_factory_options([{sockopts, OptList} | Rest], Opts) when is_list(OptList) ->
+ parse_factory_options(Rest, Opts#factory_opts{sockopts=OptList});
+parse_factory_options([{connect_timeout, TO} | Rest], Opts) when TO =:= infinity; is_integer(TO) ->
+ parse_factory_options(Rest, Opts#factory_opts{connect_timeout=TO}).
+
+
+%%
+%% Generates a "transport factory" function - a fun which returns a thrift_transport()
+%% instance.
+%% This can be passed into a protocol factory to generate a connection to a
+%% thrift server over a socket.
+%%
+new_transport_factory(Host, Port, Options) ->
+ ParsedOpts = parse_factory_options(Options, #factory_opts{}),
+
+ F = fun() ->
+ SockOpts = [binary,
+ {packet, 0},
+ {active, false},
+ {nodelay, true} |
+ ParsedOpts#factory_opts.sockopts],
+ case catch gen_tcp:connect(Host, Port, SockOpts,
+ ParsedOpts#factory_opts.connect_timeout) of
+ {ok, Sock} ->
+ {ok, Transport} = thrift_socket_transport:new(Sock),
+ {ok, BufTransport} =
+ case ParsedOpts#factory_opts.framed of
+ true -> thrift_framed_transport:new(Transport);
+ false -> thrift_buffered_transport:new(Transport)
+ end,
+ {ok, BufTransport};
+ Error ->
+ Error
+ end
+ end,
+ {ok, F}.
diff --git a/lib/erl/src/thrift_transport.erl b/lib/erl/src/thrift_transport.erl
new file mode 100644
index 0000000..20c4b5d
--- /dev/null
+++ b/lib/erl/src/thrift_transport.erl
@@ -0,0 +1,57 @@
+%%
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements. See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership. The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+
+-module(thrift_transport).
+
+-export([behaviour_info/1]).
+
+-export([new/2,
+ write/2,
+ read/2,
+ flush/1,
+ close/1
+ ]).
+
+behaviour_info(callbacks) ->
+ [{read, 2},
+ {write, 2},
+ {flush, 1},
+ {close, 1}
+ ].
+
+-record(transport, {module, data}).
+
+new(Module, Data) when is_atom(Module) ->
+ {ok, #transport{module = Module,
+ data = Data}}.
+
+%% Data :: iolist()
+write(Transport, Data) ->
+ Module = Transport#transport.module,
+ Module:write(Transport#transport.data, Data).
+
+read(Transport, Len) when is_integer(Len) ->
+ Module = Transport#transport.module,
+ Module:read(Transport#transport.data, Len).
+
+flush(#transport{module = Module, data = Data}) ->
+ Module:flush(Data).
+
+close(#transport{module = Module, data = Data}) ->
+ Module:close(Data).