THRIFT-2815 Support for Multiplexing Services on any Transport, Protocol and Server
Client: Haxe
Patch: Jens Geyer
This closes #262
diff --git a/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProcessor.hx b/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProcessor.hx
new file mode 100644
index 0000000..7354ff4
--- /dev/null
+++ b/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProcessor.hx
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.protocol;
+
+import haxe.ds.StringMap;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TProcessor;
+
+import org.apache.thrift.transport.TTransport;
+
+
+/**
+ * TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services.
+ * To do so, you instantiate the processor and then register additional processors with it,
+ * as shown in the following example:
+ *
+ * TMultiplexedProcessor processor = new TMultiplexedProcessor();
+ *
+ * processor.registerProcessor(
+ * "Calculator",
+ * new Calculator.Processor(new CalculatorHandler()));
+ *
+ * processor.registerProcessor(
+ * "WeatherReport",
+ * new WeatherReport.Processor(new WeatherReportHandler()));
+ *
+ * TServerTransport t = new TServerSocket(9090);
+ * TSimpleServer server = new TSimpleServer(processor, t);
+ *
+ * server.serve();
+ */
+class TMultiplexedProcessor implements TProcessor
+{
+ private var serviceProcessorMap : StringMap<TProcessor> = new StringMap<TProcessor>();
+ private var defaultProcessor : TProcessor = null;
+
+ /**
+ * 'Register' a service with this TMultiplexedProcessor. This allows us to broker
+ * requests to individual services by using the service name to select them at request time.
+ *
+ * Args:
+ * - serviceName Name of a service, has to be identical to the name
+ * declared in the Thrift IDL, e.g. "WeatherReport".
+ * - processor Implementation of a service, ususally referred to as "handlers",
+ * e.g. WeatherReportHandler implementing WeatherReport.Iface.
+ */
+ public function RegisterProcessor(serviceName : String, processor : TProcessor, asDefault : Bool = false) : Void {
+ serviceProcessorMap.set(serviceName, processor);
+ if ( asDefault) {
+ if( defaultProcessor != null) {
+ throw new TApplicationException( TApplicationException.UNKNOWN, "Can't have multiple default processors");
+ } else {
+ defaultProcessor = processor;
+ }
+ }
+ }
+
+
+ private function Fail( oprot : TProtocol, message : TMessage, extype : Int, etxt : String) : Void {
+ var appex = new TApplicationException( extype, etxt);
+
+ var newMessage = new TMessage(message.name, TMessageType.EXCEPTION, message.seqid);
+
+ oprot.writeMessageBegin(newMessage);
+ appex.write( oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ }
+
+
+ /**
+ * This implementation of process performs the following steps:
+ *
+ * - Read the beginning of the message.
+ * - Extract the service name from the message.
+ * - Using the service name to locate the appropriate processor.
+ * - Dispatch to the processor, with a decorated instance of TProtocol
+ * that allows readMessageBegin() to return the original TMessage.
+ *
+ * Throws an exception if
+ * - the message type is not CALL or ONEWAY,
+ * - the service name was not found in the message, or
+ * - the service name has not been RegisterProcessor()ed.
+ */
+ public function process( iprot : TProtocol, oprot : TProtocol) : Bool {
+ /* Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
+ message header. This pulls the message "off the wire", which we'll
+ deal with at the end of this method. */
+
+ var message : TMessage = iprot.readMessageBegin();
+ var methodName : String = "";
+
+ if ((message.type != TMessageType.CALL) && (message.type != TMessageType.ONEWAY))
+ {
+ Fail(oprot, message,
+ TApplicationException.INVALID_MESSAGE_TYPE,
+ "Message type CALL or ONEWAY expected");
+ return false;
+ }
+
+ // Extract the service name
+ var actualProcessor : TProcessor = null;
+ var index = message.name.indexOf(TMultiplexedProtocol.SEPARATOR);
+ if (index < 0) {
+ // fallback to default processor
+ methodName = message.name;
+ actualProcessor = defaultProcessor;
+ if( actualProcessor == null) {
+ Fail(oprot, message,
+ TApplicationException.INVALID_PROTOCOL,
+ "Service name not found in message name: " + message.name + " and no default processor defined. " +
+ "Did you forget to use a TMultiplexProtocol in your client?");
+ return false;
+ }
+
+ } else {
+ // service name given
+ var serviceName = message.name.substring(0, index);
+ methodName = message.name.substring( serviceName.length + TMultiplexedProtocol.SEPARATOR.length);
+ actualProcessor = serviceProcessorMap.get( serviceName);
+ if( actualProcessor == null) {
+ Fail(oprot, message,
+ TApplicationException.INTERNAL_ERROR,
+ "Service name not found: " + serviceName + ". " +
+ "Did you forget to call RegisterProcessor()?");
+ return false;
+ }
+ }
+
+ // Create a new TMessage, removing the service name
+ // Dispatch processing to the stored processor
+ var newMessage = new TMessage( methodName, message.type, message.seqid);
+ var storedMsg = new StoredMessageProtocol( iprot, newMessage);
+ return actualProcessor.process( storedMsg, oprot);
+ }
+}
+
+
+/**
+ * Our goal was to work with any protocol. In order to do that, we needed
+ * to allow them to call readMessageBegin() and get a TMessage in exactly
+ * the standard format, without the service name prepended to TMessage.name.
+ */
+class StoredMessageProtocol extends TProtocolDecorator
+{
+ private var messageBegin : TMessage;
+
+ public function new( protocol : TProtocol, messageBegin : TMessage) {
+ super( protocol);
+ this.messageBegin = messageBegin;
+ }
+
+ public override function readMessageBegin() : TMessage {
+ return messageBegin;
+ }
+}
+
diff --git a/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProtocol.hx b/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProtocol.hx
new file mode 100644
index 0000000..cacd1d7
--- /dev/null
+++ b/lib/haxe/src/org/apache/thrift/protocol/TMultiplexedProtocol.hx
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.protocol;
+
+import org.apache.thrift.transport.TTransport;
+
+
+/**
+ * TMultiplexedProtocol is a protocol-independent concrete decorator that allows a Thrift
+ * client to communicate with a multiplexing Thrift server, by prepending the service name
+ * to the function name during function calls.
+ *
+ * NOTE: THIS IS NOT TO BE USED BY SERVERS.
+ * On the server, use TMultiplexedProcessor to handle requests from a multiplexing client.
+ *
+ * This example uses a single socket transport to invoke two services:
+ *
+ * TSocket transport = new TSocket("localhost", 9090);
+ * transport.open();
+ *
+ * TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ *
+ * TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator");
+ * Calculator.Client service = new Calculator.Client(mp);
+ *
+ * TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport");
+ * WeatherReport.Client service2 = new WeatherReport.Client(mp2);
+ *
+ * System.out.println(service.add(2,2));
+ * System.out.println(service2.getTemperature());
+ *
+ */
+class TMultiplexedProtocol extends TProtocolDecorator {
+
+ /** Used to delimit the service name from the function name */
+ public static inline var SEPARATOR : String = ":";
+
+ private var service : String;
+
+ /**
+ * Wrap the specified protocol, allowing it to be used to communicate with a
+ * multiplexing server. The <code>serviceName</code> is required as it is
+ * prepended to the message header so that the multiplexing server can broker
+ * the function call to the proper service.
+ *
+ * Args:
+ * protocol Your communication protocol of choice, e.g. TBinaryProtocol
+ * serviceName The service name of the service communicating via this protocol.
+ */
+ public function new( protocol : TProtocol, serviceName : String) {
+ super( protocol);
+ service = serviceName;
+ }
+
+ /**
+ * Prepends the service name to the function name, separated by TMultiplexedProtocol.SEPARATOR.
+ * Args:
+ * tMessage The original message.
+ */
+ public override function writeMessageBegin( message : TMessage) : Void {
+ switch( message.type)
+ {
+ case TMessageType.CALL:
+ super.writeMessageBegin(new TMessage(
+ service + SEPARATOR + message.name,
+ message.type,
+ message.seqid));
+
+ case TMessageType.ONEWAY:
+ super.writeMessageBegin(new TMessage(
+ service + SEPARATOR + message.name,
+ message.type,
+ message.seqid));
+
+ default:
+ super.writeMessageBegin(message);
+ }
+ }
+}
+
diff --git a/lib/haxe/src/org/apache/thrift/protocol/TProtocolDecorator.hx b/lib/haxe/src/org/apache/thrift/protocol/TProtocolDecorator.hx
new file mode 100644
index 0000000..e43d2d9
--- /dev/null
+++ b/lib/haxe/src/org/apache/thrift/protocol/TProtocolDecorator.hx
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.protocol;
+
+import haxe.io.Bytes;
+import haxe.Int64;
+
+import org.apache.thrift.transport.TTransport;
+
+
+/**
+ * TProtocolDecorator forwards all requests to an enclosed TProtocol instance,
+ * providing a way to author concise concrete decorator subclasses. While it has
+ * no abstract methods, it is marked abstract as a reminder that by itself,
+ * it does not modify the behaviour of the enclosed TProtocol.
+ *
+ * See p.175 of Design Patterns (by Gamma et al.)
+ * See TMultiplexedProtocol
+ */
+class TProtocolDecorator implements TProtocol
+{
+ private var wrapped : TProtocol;
+
+ /**
+ * Encloses the specified protocol.
+ * @param protocol All operations will be forward to this protocol. Must be non-null.
+ */
+ private function new( protocol : TProtocol) // not to be instantiated, must derive a class
+ {
+ wrapped = protocol;
+ }
+
+ public function getTransport() : TTransport {
+ return wrapped.getTransport();
+ }
+
+ public function writeMessageBegin( value : TMessage) : Void {
+ wrapped.writeMessageBegin( value);
+ }
+
+ public function writeMessageEnd() : Void {
+ wrapped.writeMessageEnd();
+ }
+
+ public function writeStructBegin(value : TStruct) : Void {
+ wrapped.writeStructBegin( value);
+ }
+
+ public function writeStructEnd() : Void {
+ wrapped.writeStructEnd();
+ }
+
+ public function writeFieldBegin(value : TField) : Void {
+ wrapped.writeFieldBegin( value);
+ }
+
+ public function writeFieldEnd() : Void {
+ wrapped.writeFieldEnd();
+ }
+
+ public function writeFieldStop() : Void {
+ wrapped.writeFieldStop();
+ }
+
+ public function writeMapBegin( value : TMap) : Void {
+ wrapped.writeMapBegin( value);
+ }
+
+ public function writeMapEnd() : Void {
+ wrapped.writeMapEnd();
+ }
+
+ public function writeListBegin( value : TList) : Void {
+ wrapped.writeListBegin( value);
+ }
+
+ public function writeListEnd() : Void {
+ wrapped.writeListEnd();
+ }
+
+ public function writeSetBegin( value : TSet) : Void {
+ wrapped.writeSetBegin( value);
+ }
+
+ public function writeSetEnd() : Void {
+ wrapped.writeSetEnd();
+ }
+
+ public function writeBool(value : Bool) : Void {
+ wrapped.writeBool( value);
+ }
+
+ public function writeByte(value : Int) : Void {
+ wrapped.writeByte( value);
+ }
+
+ public function writeI16(value : Int) : Void {
+ wrapped.writeI16( value);
+ }
+
+ public function writeI32(value : Int) : Void {
+ wrapped.writeI32( value);
+ }
+
+ public function writeI64(value : haxe.Int64) : Void {
+ wrapped.writeI64( value);
+ }
+
+ public function writeDouble(value : Float) : Void {
+ wrapped.writeDouble( value);
+ }
+
+ public function writeString(value : String) : Void {
+ wrapped.writeString( value);
+ }
+
+ public function writeBinary(value : Bytes ) : Void {
+ wrapped.writeBinary( value);
+ }
+
+ public function readMessageBegin() : TMessage {
+ return wrapped.readMessageBegin();
+ }
+
+ public function readMessageEnd() : Void {
+ wrapped.readMessageEnd();
+ }
+
+ public function readStructBegin() : TStruct {
+ return wrapped.readStructBegin();
+ }
+
+ public function readStructEnd() : Void {
+ wrapped.readStructEnd();
+ }
+
+ public function readFieldBegin() : TField {
+ return wrapped.readFieldBegin();
+ }
+
+ public function readFieldEnd() : Void {
+ wrapped.readFieldEnd();
+ }
+
+ public function readMapBegin() : TMap {
+ return wrapped.readMapBegin();
+ }
+
+ public function readMapEnd() : Void {
+ wrapped.readMapEnd();
+ }
+
+ public function readListBegin() : TList {
+ return wrapped.readListBegin();
+ }
+
+ public function readListEnd() : Void {
+ wrapped.readListEnd();
+ }
+
+ public function readSetBegin() : TSet {
+ return wrapped.readSetBegin();
+ }
+
+ public function readSetEnd() : Void {
+ wrapped.readSetEnd();
+ }
+
+ public function readBool() : Bool
+ {
+ return wrapped.readBool();
+ }
+
+ public function readByte() : Int {
+ return wrapped.readByte();
+ }
+
+ public function readI16() : Int {
+ return wrapped.readI16();
+ }
+
+ public function readI32() : Int {
+ return wrapped.readI32();
+ }
+
+ public function readI64() : haxe.Int64 {
+ return wrapped.readI64();
+ }
+
+ public function readDouble() : Float {
+ return wrapped.readDouble();
+ }
+
+ public function readString() : String {
+ return wrapped.readString();
+ }
+
+ public function readBinary() : Bytes {
+ return wrapped.readBinary();
+ }
+}
diff --git a/lib/haxe/test/HaxeTests.hxproj b/lib/haxe/test/HaxeTests.hxproj
index 4e8929b..3beed82 100644
--- a/lib/haxe/test/HaxeTests.hxproj
+++ b/lib/haxe/test/HaxeTests.hxproj
@@ -53,14 +53,16 @@
<hidden path="python.hxml" />
</hiddenPaths>
<!-- Executed before build -->
- <preBuildCommand>thrift -r -gen haxe ../../../test/ThriftTest.thrift</preBuildCommand>
+ <preBuildCommand>thrift -r -gen haxe ../../../test/ThriftTest.thrift
+thrift -r -gen haxe ../../../contrib/async-test/aggr.thrift
+thrift -r -gen haxe ../../../lib/rb/benchmark/Benchmark.thrift</preBuildCommand>
<!-- Executed after build -->
<postBuildCommand alwaysRun="False" />
<!-- Other project options -->
<options>
<option showHiddenPaths="False" />
- <option testMovie="Unknown" />
- <option testMovieCommand="" />
+ <option testMovie="Custom" />
+ <option testMovieCommand="bin/HaxeTests/Main.exe server multiplex" />
</options>
<!-- Plugin storage -->
<storage />
diff --git a/lib/haxe/test/Makefile.am b/lib/haxe/test/Makefile.am
index 357436c..13b4266 100644
--- a/lib/haxe/test/Makefile.am
+++ b/lib/haxe/test/Makefile.am
@@ -20,15 +20,25 @@
THRIFT = $(top_srcdir)/compiler/cpp/thrift
THRIFTCMD = $(THRIFT) --gen haxe -r
THRIFTTEST = $(top_srcdir)/test/ThriftTest.thrift
+AGGR = $(top_srcdir)/contrib/async-test/aggr.thrift
+BENCHMARK = $(top_srcdir)/lib/rb/benchmark/Benchmark.thrift
BIN_CPP = bin/Main-debug
gen-haxe/thrift/test/ThriftTest.hx: $(THRIFTTEST)
$(THRIFTCMD) $(THRIFTTEST)
+gen-haxe/thrift/test/Aggr.hx: $(AGGR)
+ $(THRIFTCMD) $(AGGR)
+
+gen-haxe/thrift/test/BenchmarkService.hx: $(BENCHMARK)
+ $(THRIFTCMD) $(BENCHMARK)
+
all-local: $(BIN_CPP)
-$(BIN_CPP): gen-haxe/thrift/test/ThriftTest.hx
+$(BIN_CPP): gen-haxe/thrift/test/ThriftTest.hx \
+ gen-haxe/thrift/test/Aggr.hx \
+ gen-haxe/thrift/test/BenchmarkService.hx
$(HAXE) --cwd . cpp.hxml
diff --git a/lib/haxe/test/make_all.bat b/lib/haxe/test/make_all.bat
index ee18f10..0314e18 100644
--- a/lib/haxe/test/make_all.bat
+++ b/lib/haxe/test/make_all.bat
@@ -26,7 +26,9 @@
set path=%HAXEPATH%;%HAXEPATH%\..\neko;%path%
rem # invoke Thrift comnpiler
-thrift -r -gen haxe ..\..\..\test\ThriftTest.thrift
+thrift -r -gen haxe ..\..\..\test\ThriftTest.thrift
+thrift -r -gen haxe ..\..\..\contrib\async-test\aggr.thrift
+thrift -r -gen haxe ..\..\..\lib\rb\benchmark\Benchmark.thrift
if errorlevel 1 goto STOP
rem # invoke Haxe compiler for all targets
diff --git a/lib/haxe/test/make_all.sh b/lib/haxe/test/make_all.sh
index 13b5754..512f5ec 100644
--- a/lib/haxe/test/make_all.sh
+++ b/lib/haxe/test/make_all.sh
@@ -20,6 +20,8 @@
# invoke Thrift comnpiler
thrift -r -gen haxe ../../../test/ThriftTest.thrift
+thrift -r -gen haxe ../../../contrib/async-test/aggr.thrift
+thrift -r -gen haxe ../../../lib/rb/benchmark/Benchmark.thrift
# output folder
if [ ! -d bin ]; then
diff --git a/lib/haxe/test/src/Main.hx b/lib/haxe/test/src/Main.hx
index da0a7f5..6c262d7 100644
--- a/lib/haxe/test/src/Main.hx
+++ b/lib/haxe/test/src/Main.hx
@@ -27,19 +27,67 @@
import thrift.test.*; // generated code
+
+enum WhatTests {
+ Normal;
+ Multiplex;
+}
+
class Main
{
+ static private var tests : WhatTests = Normal;
+ static private var server : Bool = false;
+
+ static private inline var CMDLINEHELP : String
+ = "\nHaxeTests [client|server] [multiplex]\n"
+ + " client|server ... determines run mode for some tests, default is client\n"
+ + " multiplex ........ run multiplex test server or client\n";
+
+ static private function ParseArgs() {
+ #if sys
+
+ var args = Sys.args();
+ if ( args != null) {
+ for ( arg in args) {
+ switch(arg.toLowerCase()) {
+ case "client":
+ server = false;
+ case "server" :
+ server = true;
+ case "multiplex" :
+ tests = Multiplex;
+ default:
+ throw 'Invalid argument "$arg"\n'+CMDLINEHELP;
+ }
+ }
+ }
+
+ #end
+ }
+
static public function main()
{
try
{
- StreamTest.Run();
+ ParseArgs();
+
+ switch( tests) {
+ case Normal:
+ StreamTest.Run(server);
+ case Multiplex:
+ MultiplexTest.Run(server);
+ default:
+ throw "Unhandled test mode $tests";
+ }
trace("All tests completed.");
}
catch( e: Dynamic)
{
trace('$e');
+ #if sys
+ Sys.exit(1); // indicate error
+ #end
}
}
}
\ No newline at end of file
diff --git a/lib/haxe/test/src/MultiplexTest.hx b/lib/haxe/test/src/MultiplexTest.hx
new file mode 100644
index 0000000..3818b66
--- /dev/null
+++ b/lib/haxe/test/src/MultiplexTest.hx
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package;
+
+import haxe.Int64;
+import haxe.Int32;
+
+import org.apache.thrift.*;
+import org.apache.thrift.protocol.*;
+import org.apache.thrift.transport.*;
+import org.apache.thrift.server.*;
+import org.apache.thrift.meta_data.*;
+
+// debug only
+import org.apache.thrift.protocol.TProtocolDecorator;
+import org.apache.thrift.protocol.TMultiplexedProtocol;
+import org.apache.thrift.protocol.TMultiplexedProcessor;
+
+// generated code imports
+import Aggr;
+import AggrImpl;
+import AggrProcessor;
+import BenchmarkService;
+import BenchmarkServiceImpl;
+import BenchmarkServiceProcessor;
+import Error;
+
+
+class BenchmarkServiceHandler implements BenchmarkService
+{
+ public function new() {
+ }
+
+ public function fibonacci(n : haxe.Int32) : haxe.Int32 {
+ trace('Benchmark.fibonacci($n)');
+ var next : Int;
+ var prev = 0;
+ var result = 1;
+ while( n > 0)
+ {
+ next = result + prev;
+ prev = result;
+ result = next;
+ --n;
+ }
+ return result;
+ }
+}
+
+
+class AggrServiceHandler implements Aggr
+{
+ private var values : List<haxe.Int32> = new List<haxe.Int32>();
+
+ public function new() {
+ }
+
+ public function addValue(value : haxe.Int32) : Void {
+ trace('Aggr.addValue($value)');
+ values.add( value);
+ }
+
+ public function getValues() : List< haxe.Int32> {
+ trace('Aggr.getValues()');
+ return values;
+ }
+}
+
+
+
+class MultiplexTest extends TestBase {
+
+ private inline static var NAME_BENCHMARKSERVICE : String = "BenchmarkService";
+ private inline static var NAME_AGGR : String = "Aggr";
+
+
+ public static override function Run(server : Bool) : Void {
+ if ( server) {
+ RunMultiplexServer();
+ } else {
+ RunMultiplexClient();
+ RunDefaultClient();
+ }
+ }
+
+
+ // run the multiplex server
+ public static override function RunMultiplexServer() : Void {
+ try
+ {
+ var benchHandler : BenchmarkService = new BenchmarkServiceHandler();
+ var benchProcessor : TProcessor = new BenchmarkServiceProcessor( benchHandler);
+
+ var aggrHandler : Aggr = new AggrServiceHandler();
+ var aggrProcessor : TProcessor = new AggrProcessor( aggrHandler);
+
+ var multiplex : TMultiplexedProcessor = new TMultiplexedProcessor();
+ multiplex.RegisterProcessor( NAME_BENCHMARKSERVICE, benchProcessor, true); // default
+ multiplex.RegisterProcessor( NAME_AGGR, aggrProcessor);
+
+ // protocol+transport stack
+ var protfact : TProtocolFactory = new TBinaryProtocolFactory(true,true);
+ var servertrans : TServerTransport = new TServerSocket( 9090, 5, false);
+ var transfact : TTransportFactory = new TFramedTransportFactory();
+
+ var server : TServer = new TSimpleServer( multiplex, servertrans, transfact, protfact);
+
+ trace("Starting the server ...");
+ server.Serve();
+ }
+ catch( e : TApplicationException)
+ {
+ TestBase.Expect(false,'${e.errorID} ${e.errorMsg}');
+ }
+ catch( e : TException)
+ {
+ TestBase.Expect(false,'$e');
+ }
+ }
+
+
+ // run multiplex client against multiplex server
+ public static override function RunMultiplexClient() : Void {
+ try
+ {
+ var trans : TTransport;
+ trans = new TSocket("localhost", 9090);
+ trans = new TFramedTransport(trans);
+ trans.open();
+
+ var protocol : TProtocol = new TBinaryProtocol(trans,true,true);
+ var multiplex : TMultiplexedProtocol;
+
+ multiplex = new TMultiplexedProtocol( protocol, NAME_BENCHMARKSERVICE);
+ var bench = new BenchmarkServiceImpl( multiplex);
+
+ multiplex = new TMultiplexedProtocol( protocol, NAME_AGGR);
+ var aggr = new AggrImpl( multiplex);
+
+ trace('calling aggr.add( bench.fibo())...');
+ for( i in 1 ... 10)
+ {
+ trace('$i');
+ aggr.addValue( bench.fibonacci(i));
+ }
+
+ trace('calling aggr ...');
+ var i = 1;
+ var values = aggr.getValues();
+ TestBase.Expect(values != null,'aggr.getValues() == null');
+ for( k in values)
+ {
+ trace('fib($i) = $k');
+ ++i;
+ }
+
+ trans.close();
+ trace('done.');
+
+ }
+ catch( e : TApplicationException)
+ {
+ TestBase.Expect(false,'${e.errorID} ${e.errorMsg}');
+ }
+ catch( e : TException)
+ {
+ TestBase.Expect(false,'$e');
+ }
+ }
+
+
+ // run non-multiplex client against multiplex server to test default fallback
+ public static override function RunDefaultClient() : Void {
+ try
+ {
+ var trans : TTransport;
+ trans = new TSocket("localhost", 9090);
+ trans = new TFramedTransport(trans);
+ trans.open();
+
+ var protocol : TProtocol = new TBinaryProtocol(trans,true,true);
+
+ var bench = new BenchmarkServiceImpl( protocol);
+
+ trace('calling bench (via default) ...');
+ for( i in 1 ... 10)
+ {
+ var k = bench.fibonacci(i);
+ trace('fib($i) = $k');
+ }
+
+ trans.close();
+ trace('done.');
+ }
+ catch( e : TApplicationException)
+ {
+ TestBase.Expect(false,'${e.errorID} ${e.errorMsg}');
+ }
+ catch( e : TException)
+ {
+ TestBase.Expect(false,'$e');
+ }
+ }
+
+}
+
+
diff --git a/lib/haxe/test/src/StreamTest.hx b/lib/haxe/test/src/StreamTest.hx
index 7500eee..244f1ea 100644
--- a/lib/haxe/test/src/StreamTest.hx
+++ b/lib/haxe/test/src/StreamTest.hx
@@ -20,6 +20,7 @@
package;
import haxe.Int64;
+import sys.FileSystem;
import org.apache.thrift.*;
import org.apache.thrift.protocol.*;
@@ -33,15 +34,9 @@
class StreamTest extends TestBase {
- private inline static var tmpfile : String = "bin/data.tmp";
+ private inline static var tmpfile : String = "data.tmp";
- private static function Expect( expr : Bool, info : String, ?pos : haxe.PosInfos) : Void {
- if( ! expr) {
- throw ('Test "$info" failed at '+pos.methodName+' in '+pos.fileName+':'+pos.lineNumber);
- }
- }
-
private static function MakeTestData() : Xtruct {
var data : Xtruct = new Xtruct();
data.string_thing = "Streamtest";
@@ -77,15 +72,22 @@
return data;
}
- public static override function Run() : Void
+ public static override function Run(server : Bool) : Void
{
- var written = WriteData();
- var read = ReadData();
+ try {
+ var written = WriteData();
+ var read = ReadData();
+ FileSystem.deleteFile(tmpfile);
- Expect( read.string_thing == written.string_thing, "string data");
- Expect( read.byte_thing == written.byte_thing, "byte data");
- Expect( read.i32_thing == written.i32_thing, "i32 data");
- Expect( Int64.compare( read.i64_thing, written.i64_thing) == 0, "i64 data");
+ TestBase.Expect( read.string_thing == written.string_thing, "string data");
+ TestBase.Expect( read.byte_thing == written.byte_thing, "byte data");
+ TestBase.Expect( read.i32_thing == written.i32_thing, "i32 data");
+ TestBase.Expect( Int64.compare( read.i64_thing, written.i64_thing) == 0, "i64 data");
+
+ } catch(e:Dynamic) {
+ FileSystem.deleteFile(tmpfile);
+ throw e;
+ }
}
}
diff --git a/lib/haxe/test/src/TestBase.hx b/lib/haxe/test/src/TestBase.hx
index 2a344d6..1232773 100644
--- a/lib/haxe/test/src/TestBase.hx
+++ b/lib/haxe/test/src/TestBase.hx
@@ -25,7 +25,6 @@
import org.apache.thrift.server.*;
import org.apache.thrift.meta_data.*;
-import thrift.test.*; // generated code
class TestBase {
@@ -33,8 +32,15 @@
// override, if necessary
}
- public static function Run() : Void {
+ public static function Run(server : Bool) : Void {
throw new AbstractMethodError();
}
+
+ public static function Expect( expr : Bool, info : String, ?pos : haxe.PosInfos) : Void {
+ if( ! expr) {
+ throw ('Test "$info" failed at '+pos.methodName+' in '+pos.fileName+':'+pos.lineNumber);
+ }
+ }
+
}
\ No newline at end of file