THRIFT-1899 Delphi: Support for Multiplexing Services on any Transport, Protocol and Server
Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Processor.Multiplex.pas b/lib/delphi/src/Thrift.Processor.Multiplex.pas
new file mode 100644
index 0000000..b771d43
--- /dev/null
+++ b/lib/delphi/src/Thrift.Processor.Multiplex.pas
@@ -0,0 +1,182 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+unit Thrift.Processor.Multiplex;
+
+
+interface
+
+uses
+ SysUtils,
+ Generics.Collections,
+ Thrift,
+ Thrift.Protocol,
+ Thrift.Protocol.Multiplex;
+
+{ TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services.
+ To do so, you instantiate the processor and then register additional processors with it,
+ as shown in the following example:
+
+
+ TMultiplexedProcessor processor = new TMultiplexedProcessor();
+
+ processor.registerProcessor(
+ "Calculator",
+ new Calculator.Processor(new CalculatorHandler()));
+
+ processor.registerProcessor(
+ "WeatherReport",
+ new WeatherReport.Processor(new WeatherReportHandler()));
+
+ TServerTransport t = new TServerSocket(9090);
+ TSimpleServer server = new TSimpleServer(processor, t);
+
+ server.serve();
+}
+
+
+type
+ IMultiplexedProcessor = interface( IProcessor)
+ ['{810FF32D-22A2-4D58-B129-B0590703ECEC}']
+ // Register a service with this TMultiplexedProcessor. This allows us
+ // to broker requests to individual services by using the service name
+ // to select them at request time.
+ procedure RegisterProcessor( const serviceName : String; const processor : IProcessor);
+ end;
+
+
+ TMultiplexedProcessorImpl = class( TInterfacedObject, IMultiplexedProcessor, IProcessor)
+ private type
+ // Our goal was to work with any protocol. In order to do that, we needed
+ // to allow them to call readMessageBegin() and get a TMessage in exactly
+ // the standard format, without the service name prepended to TMessage.name.
+ TStoredMessageProtocol = class( TProtocolDecorator)
+ private
+ FMessageBegin : IMessage;
+ public
+ constructor Create( const protocol : IProtocol; const aMsgBegin : IMessage);
+ function ReadMessageBegin: IMessage; override;
+ end;
+
+ private
+ FServiceProcessorMap : TDictionary<String, IProcessor>;
+
+ public
+ constructor Create;
+ destructor Destroy; override;
+
+ // Register a service with this TMultiplexedProcessorImpl. This allows us
+ // to broker requests to individual services by using the service name
+ // to select them at request time.
+ procedure RegisterProcessor( const serviceName : String; const processor : IProcessor);
+
+ { This implementation of process performs the following steps:
+ - Read the beginning of the message.
+ - Extract the service name from the message.
+ - Using the service name to locate the appropriate processor.
+ - Dispatch to the processor, with a decorated instance of TProtocol
+ that allows readMessageBegin() to return the original TMessage.
+
+ An exception is thrown if the message type is not CALL or ONEWAY
+ or if the service is unknown (or not properly registered).
+ }
+ function Process(const iprot, oprot : IProtocol) : Boolean;
+ end;
+
+
+implementation
+
+constructor TMultiplexedProcessorImpl.TStoredMessageProtocol.Create( const protocol : IProtocol; const aMsgBegin : IMessage);
+begin
+ inherited Create( protocol);
+ FMessageBegin := aMsgBegin;
+end;
+
+
+function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: IMessage;
+begin
+ result := FMessageBegin;
+end;
+
+
+constructor TMultiplexedProcessorImpl.Create;
+begin
+ inherited Create;
+ FServiceProcessorMap := TDictionary<string,IProcessor>.Create;
+end;
+
+
+destructor TMultiplexedProcessorImpl.Destroy;
+begin
+ try
+ FreeAndNil( FServiceProcessorMap);
+ finally
+ inherited Destroy;
+ end;
+end;
+
+
+procedure TMultiplexedProcessorImpl.RegisterProcessor( const serviceName : String; const processor : IProcessor);
+begin
+ FServiceProcessorMap.Add( serviceName, processor);
+end;
+
+
+function TMultiplexedProcessorImpl.Process(const iprot, oprot : IProtocol) : Boolean;
+var msg, newMsg : IMessage;
+ idx : Integer;
+ sService : string;
+ processor : IProcessor;
+ protocol : IProtocol;
+const
+ ERROR_INVALID_MSGTYPE = 'Message must be "call" or "oneway"';
+ ERROR_INCOMPATIBLE_PROT = 'No service name found in "%s". Client is expected to use TMultiplexProtocol.';
+ ERROR_UNKNOWN_SERVICE = 'Service "%s" is not registered with MultiplexedProcessor';
+begin
+ // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header.
+ // This pulls the message "off the wire", which we'll deal with at the end of this method.
+ msg := iprot.readMessageBegin();
+ if not (msg.Type_ in [TMessageType.Call, TMessageType.Oneway])
+ then raise TApplicationException.Create( TApplicationException.TExceptionType.InvalidMessageType,
+ ERROR_INVALID_MSGTYPE);
+
+ // Extract the service name
+ idx := Pos( TMultiplexedProtocol.SEPARATOR, msg.Name);
+ if idx < 1
+ then raise TApplicationException.Create( TApplicationException.TExceptionType.InvalidProtocol,
+ Format(ERROR_INCOMPATIBLE_PROT,[msg.Name]));
+
+ // Create a new TMessage, something that can be consumed by any TProtocol
+ sService := Copy( msg.Name, 1, idx-1);
+ if not FServiceProcessorMap.TryGetValue( sService, processor)
+ then raise TApplicationException.Create( TApplicationException.TExceptionType.InternalError,
+ Format(ERROR_UNKNOWN_SERVICE,[sService]));
+
+ // Create a new TMessage, removing the service name
+ Inc( idx, Length(TMultiplexedProtocol.SEPARATOR));
+ newMsg := TMessageImpl.Create( Copy( msg.Name, idx, MAXINT), msg.Type_, msg.SeqID);
+
+ // Dispatch processing to the stored processor
+ protocol := TStoredMessageProtocol.Create( iprot, newMsg);
+ result := processor.process( protocol, oprot);
+end;
+
+
+end.
+
diff --git a/lib/delphi/src/Thrift.Protocol.Multiplex.pas b/lib/delphi/src/Thrift.Protocol.Multiplex.pas
new file mode 100644
index 0000000..2cd2401
--- /dev/null
+++ b/lib/delphi/src/Thrift.Protocol.Multiplex.pas
@@ -0,0 +1,107 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+unit Thrift.Protocol.Multiplex;
+
+interface
+
+uses Thrift.Protocol;
+
+{ TMultiplexedProtocol is a protocol-independent concrete decorator
+ that allows a Thrift client to communicate with a multiplexing Thrift server,
+ by prepending the service name to the function name during function calls.
+
+ NOTE: THIS IS NOT USED BY SERVERS.
+ On the server, use TMultiplexedProcessor to handle requests from a multiplexing client.
+
+ This example uses a single socket transport to invoke two services:
+
+ TSocket transport = new TSocket("localhost", 9090);
+ transport.open();
+
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+ TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator");
+ Calculator.Client service = new Calculator.Client(mp);
+
+ TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport");
+ WeatherReport.Client service2 = new WeatherReport.Client(mp2);
+
+ System.out.println(service.add(2,2));
+ System.out.println(service2.getTemperature());
+
+}
+
+type
+ TMultiplexedProtocol = class( TProtocolDecorator)
+ public const
+ { Used to delimit the service name from the function name }
+ SEPARATOR = ':';
+
+ private
+ FServiceName : String;
+
+ public
+ { Wrap the specified protocol, allowing it to be used to communicate with a multiplexing server.
+ The serviceName is required as it is prepended to the message header so that the multiplexing
+ server can broker the function call to the proper service.
+
+ Args:
+ protocol ....... Your communication protocol of choice, e.g. TBinaryProtocol.
+ serviceName .... The service name of the service communicating via this protocol.
+ }
+ constructor Create( const aProtocol : IProtocol; const aServiceName : string);
+
+ { Prepends the service name to the function name, separated by SEPARATOR.
+ Args: The original message.
+ }
+ procedure WriteMessageBegin( const msg: IMessage); override;
+ end;
+
+
+implementation
+
+
+constructor TMultiplexedProtocol.Create(const aProtocol: IProtocol; const aServiceName: string);
+begin
+ ASSERT( aServiceName <> '');
+ inherited Create(aProtocol);
+ FServiceName := aServiceName;
+end;
+
+
+procedure TMultiplexedProtocol.WriteMessageBegin( const msg: IMessage);
+// Prepends the service name to the function name, separated by TMultiplexedProtocol.SEPARATOR.
+var newMsg : IMessage;
+begin
+ case msg.Type_ of
+ TMessageType.Call,
+ TMessageType.Oneway : begin
+ newMsg := TMessageImpl.Create( FServiceName + SEPARATOR + msg.Name, msg.Type_, msg.SeqID);
+ inherited WriteMessageBegin( newMsg);
+ end;
+
+ else
+ inherited WriteMessageBegin( msg);
+ end;
+end;
+
+
+end.
+
diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas
index 4c1954c..b08458a 100644
--- a/lib/delphi/src/Thrift.Protocol.pas
+++ b/lib/delphi/src/Thrift.Protocol.pas
@@ -437,6 +437,69 @@
procedure SetReadLength( readLength: Integer );
end;
+
+ { TProtocolDecorator forwards all requests to an enclosed TProtocol instance,
+ providing a way to author concise concrete decorator subclasses. The decorator
+ does not (and should not) modify the behaviour of the enclosed TProtocol
+
+ See p.175 of Design Patterns (by Gamma et al.)
+ }
+ TProtocolDecorator = class( TProtocolImpl)
+ private
+ FWrappedProtocol : IProtocol;
+
+ public
+ // Encloses the specified protocol.
+ // All operations will be forward to the given protocol. Must be non-null.
+ constructor Create( const aProtocol : IProtocol);
+
+ procedure WriteMessageBegin( const msg: IMessage); override;
+ procedure WriteMessageEnd; override;
+ procedure WriteStructBegin( const struc: IStruct); override;
+ procedure WriteStructEnd; override;
+ procedure WriteFieldBegin( const field: IField); override;
+ procedure WriteFieldEnd; override;
+ procedure WriteFieldStop; override;
+ procedure WriteMapBegin( const map: IMap); override;
+ procedure WriteMapEnd; override;
+ procedure WriteListBegin( const list: IList); override;
+ procedure WriteListEnd(); override;
+ procedure WriteSetBegin( const set_: ISet ); override;
+ procedure WriteSetEnd(); override;
+ procedure WriteBool( b: Boolean); override;
+ procedure WriteByte( b: ShortInt); override;
+ procedure WriteI16( i16: SmallInt); override;
+ procedure WriteI32( i32: Integer); override;
+ procedure WriteI64( const i64: Int64); override;
+ procedure WriteDouble( const d: Double); override;
+ procedure WriteString( const s: string ); override;
+ procedure WriteAnsiString( const s: AnsiString); override;
+ procedure WriteBinary( const b: TBytes); override;
+
+ function ReadMessageBegin: IMessage; override;
+ procedure ReadMessageEnd(); override;
+ function ReadStructBegin: IStruct; override;
+ procedure ReadStructEnd; override;
+ function ReadFieldBegin: IField; override;
+ procedure ReadFieldEnd(); override;
+ function ReadMapBegin: IMap; override;
+ procedure ReadMapEnd(); override;
+ function ReadListBegin: IList; override;
+ procedure ReadListEnd(); override;
+ function ReadSetBegin: ISet; override;
+ procedure ReadSetEnd(); override;
+ function ReadBool: Boolean; override;
+ function ReadByte: ShortInt; override;
+ function ReadI16: SmallInt; override;
+ function ReadI32: Integer; override;
+ function ReadI64: Int64; override;
+ function ReadDouble:Double; override;
+ function ReadBinary: TBytes; override;
+ function ReadString: string; override;
+ function ReadAnsiString: AnsiString; override;
+ end;
+
+
implementation
function ConvertInt64ToDouble( const n: Int64): Double;
@@ -1228,5 +1291,275 @@
Result := TBinaryProtocolImpl.Create( trans, FStrictRead, FStrictWrite);
end;
+
+{ TProtocolDecorator }
+
+constructor TProtocolDecorator.Create( const aProtocol : IProtocol);
+begin
+ ASSERT( aProtocol <> nil);
+ inherited Create( aProtocol.Transport);
+ FWrappedProtocol := aProtocol;
+end;
+
+
+procedure TProtocolDecorator.WriteMessageBegin( const msg: IMessage);
+begin
+ FWrappedProtocol.WriteMessageBegin( msg);
+end;
+
+
+procedure TProtocolDecorator.WriteMessageEnd;
+begin
+ FWrappedProtocol.WriteMessageEnd;
+end;
+
+
+procedure TProtocolDecorator.WriteStructBegin( const struc: IStruct);
+begin
+ FWrappedProtocol.WriteStructBegin( struc);
+end;
+
+
+procedure TProtocolDecorator.WriteStructEnd;
+begin
+ FWrappedProtocol.WriteStructEnd;
+end;
+
+
+procedure TProtocolDecorator.WriteFieldBegin( const field: IField);
+begin
+ FWrappedProtocol.WriteFieldBegin( field);
+end;
+
+
+procedure TProtocolDecorator.WriteFieldEnd;
+begin
+ FWrappedProtocol.WriteFieldEnd;
+end;
+
+
+procedure TProtocolDecorator.WriteFieldStop;
+begin
+ FWrappedProtocol.WriteFieldStop;
+end;
+
+
+procedure TProtocolDecorator.WriteMapBegin( const map: IMap);
+begin
+ FWrappedProtocol.WriteMapBegin( map);
+end;
+
+
+procedure TProtocolDecorator.WriteMapEnd;
+begin
+ FWrappedProtocol.WriteMapEnd;
+end;
+
+
+procedure TProtocolDecorator.WriteListBegin( const list: IList);
+begin
+ FWrappedProtocol.WriteListBegin( list);
+end;
+
+
+procedure TProtocolDecorator.WriteListEnd();
+begin
+ FWrappedProtocol.WriteListEnd();
+end;
+
+
+procedure TProtocolDecorator.WriteSetBegin( const set_: ISet );
+begin
+ FWrappedProtocol.WriteSetBegin( set_);
+end;
+
+
+procedure TProtocolDecorator.WriteSetEnd();
+begin
+ FWrappedProtocol.WriteSetEnd();
+end;
+
+
+procedure TProtocolDecorator.WriteBool( b: Boolean);
+begin
+ FWrappedProtocol.WriteBool( b);
+end;
+
+
+procedure TProtocolDecorator.WriteByte( b: ShortInt);
+begin
+ FWrappedProtocol.WriteByte( b);
+end;
+
+
+procedure TProtocolDecorator.WriteI16( i16: SmallInt);
+begin
+ FWrappedProtocol.WriteI16( i16);
+end;
+
+
+procedure TProtocolDecorator.WriteI32( i32: Integer);
+begin
+ FWrappedProtocol.WriteI32( i32);
+end;
+
+
+procedure TProtocolDecorator.WriteI64( const i64: Int64);
+begin
+ FWrappedProtocol.WriteI64( i64);
+end;
+
+
+procedure TProtocolDecorator.WriteDouble( const d: Double);
+begin
+ FWrappedProtocol.WriteDouble( d);
+end;
+
+
+procedure TProtocolDecorator.WriteString( const s: string );
+begin
+ FWrappedProtocol.WriteString( s);
+end;
+
+
+procedure TProtocolDecorator.WriteAnsiString( const s: AnsiString);
+begin
+ FWrappedProtocol.WriteAnsiString( s);
+end;
+
+
+procedure TProtocolDecorator.WriteBinary( const b: TBytes);
+begin
+ FWrappedProtocol.WriteBinary( b);
+end;
+
+
+function TProtocolDecorator.ReadMessageBegin: IMessage;
+begin
+ result := FWrappedProtocol.ReadMessageBegin;
+end;
+
+
+procedure TProtocolDecorator.ReadMessageEnd();
+begin
+ FWrappedProtocol.ReadMessageEnd();
+end;
+
+
+function TProtocolDecorator.ReadStructBegin: IStruct;
+begin
+ result := FWrappedProtocol.ReadStructBegin;
+end;
+
+
+procedure TProtocolDecorator.ReadStructEnd;
+begin
+ FWrappedProtocol.ReadStructEnd;
+end;
+
+
+function TProtocolDecorator.ReadFieldBegin: IField;
+begin
+ result := FWrappedProtocol.ReadFieldBegin;
+end;
+
+
+procedure TProtocolDecorator.ReadFieldEnd();
+begin
+ FWrappedProtocol.ReadFieldEnd();
+end;
+
+
+function TProtocolDecorator.ReadMapBegin: IMap;
+begin
+ result := FWrappedProtocol.ReadMapBegin;
+end;
+
+
+procedure TProtocolDecorator.ReadMapEnd();
+begin
+ FWrappedProtocol.ReadMapEnd();
+end;
+
+
+function TProtocolDecorator.ReadListBegin: IList;
+begin
+ result := FWrappedProtocol.ReadListBegin;
+end;
+
+
+procedure TProtocolDecorator.ReadListEnd();
+begin
+ FWrappedProtocol.ReadListEnd();
+end;
+
+
+function TProtocolDecorator.ReadSetBegin: ISet;
+begin
+ result := FWrappedProtocol.ReadSetBegin;
+end;
+
+
+procedure TProtocolDecorator.ReadSetEnd();
+begin
+ FWrappedProtocol.ReadSetEnd();
+end;
+
+
+function TProtocolDecorator.ReadBool: Boolean;
+begin
+ result := FWrappedProtocol.ReadBool;
+end;
+
+
+function TProtocolDecorator.ReadByte: ShortInt;
+begin
+ result := FWrappedProtocol.ReadByte;
+end;
+
+
+function TProtocolDecorator.ReadI16: SmallInt;
+begin
+ result := FWrappedProtocol.ReadI16;
+end;
+
+
+function TProtocolDecorator.ReadI32: Integer;
+begin
+ result := FWrappedProtocol.ReadI32;
+end;
+
+
+function TProtocolDecorator.ReadI64: Int64;
+begin
+ result := FWrappedProtocol.ReadI64;
+end;
+
+
+function TProtocolDecorator.ReadDouble:Double;
+begin
+ result := FWrappedProtocol.ReadDouble;
+end;
+
+
+function TProtocolDecorator.ReadBinary: TBytes;
+begin
+ result := FWrappedProtocol.ReadBinary;
+end;
+
+
+function TProtocolDecorator.ReadString: string;
+begin
+ result := FWrappedProtocol.ReadString;
+end;
+
+
+function TProtocolDecorator.ReadAnsiString: AnsiString;
+begin
+ result := FWrappedProtocol.ReadAnsiString;
+end;
+
+
+
end.
diff --git a/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas b/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas
new file mode 100644
index 0000000..2cc7ab0
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas
@@ -0,0 +1,130 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+unit Multiplex.Client.Main;
+
+{.$DEFINE StressTest} // activate to stress-test the server with frequent connects/disconnects
+{.$DEFINE PerfTest} // activate to activate the performance test
+
+interface
+
+uses
+ Windows, SysUtils, Classes,
+ DateUtils,
+ Generics.Collections,
+ Thrift,
+ Thrift.Protocol,
+ Thrift.Protocol.Multiplex,
+ Thrift.Transport.Pipes,
+ Thrift.Transport,
+ Thrift.Stream,
+ Thrift.Collections,
+ Benchmark, // in gen-delphi folder
+ Aggr, // in gen-delphi folder
+ Multiplex.Test.Common;
+
+type
+ TTestClient = class
+ protected
+ FProtocol : IProtocol;
+
+ procedure ParseArgs( const args: array of string);
+ procedure Setup;
+ procedure Run;
+ public
+ constructor Create( const args: array of string);
+ class procedure Execute( const args: array of string);
+ end;
+
+implementation
+
+
+type
+ IServiceClient = interface
+ ['{7745C1C2-AB20-43BA-B6F0-08BF92DE0BAC}']
+ procedure Test;
+ end;
+
+//--- TTestClient -------------------------------------
+
+
+class procedure TTestClient.Execute( const args: array of string);
+var client : TTestClient;
+begin
+ client := TTestClient.Create(args);
+ try
+ client.Run;
+ finally
+ client.Free;
+ end;
+end;
+
+
+constructor TTestClient.Create( const args: array of string);
+begin
+ ParseArgs(args);
+ Setup;
+end;
+
+
+procedure TTestClient.ParseArgs( const args: array of string);
+begin
+ if Length(args) <> 0
+ then raise Exception.Create('No args accepted so far');
+end;
+
+
+procedure TTestClient.Setup;
+var trans : ITransport;
+begin
+ trans := TSocketImpl.Create( 'localhost', 9090);
+ trans := TFramedTransportImpl.Create( trans);
+ trans.Open;
+ FProtocol := TBinaryProtocolImpl.Create( trans, TRUE, TRUE);
+end;
+
+
+procedure TTestClient.Run;
+var bench : TBenchmarkService.Iface;
+ aggr : TAggr.Iface;
+ multiplex : IProtocol;
+ i : Integer;
+begin
+ try
+ multiplex := TMultiplexedProtocol.Create( FProtocol, NAME_BENCHMARKSERVICE);
+ bench := TBenchmarkService.TClient.Create( multiplex);
+
+ multiplex := TMultiplexedProtocol.Create( FProtocol, NAME_AGGR);
+ aggr := TAggr.TClient.Create( multiplex);
+
+ for i := 1 to 10
+ do aggr.addValue( bench.fibonacci(i));
+
+ for i in aggr.getValues
+ do Write(IntToStr(i)+' ');
+ WriteLn;
+ except
+ on e:Exception do Writeln(#10+e.Message);
+ end;
+end;
+
+
+end.
+
+
diff --git a/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas b/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas
new file mode 100644
index 0000000..4f5cd13
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas
@@ -0,0 +1,201 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+unit Multiplex.Server.Main;
+
+{$WARN SYMBOL_PLATFORM OFF}
+
+{.$DEFINE RunEndless} // activate to interactively stress-test the server stop routines via Ctrl+C
+
+interface
+
+uses
+ Windows, SysUtils,
+ Generics.Collections,
+ Thrift.Console,
+ Thrift.Server,
+ Thrift.Transport,
+ Thrift.Transport.Pipes,
+ Thrift.Protocol,
+ Thrift.Protocol.Multiplex,
+ Thrift.Processor.Multiplex,
+ Thrift.Collections,
+ Thrift.Utils,
+ Thrift,
+ Benchmark, // in gen-delphi folder
+ Aggr, // in gen-delphi folder
+ Multiplex.Test.Common,
+ Contnrs;
+
+type
+ TTestServer = class
+ public type
+ ITestHandler = interface
+ ['{CAE09AAB-80FB-48E9-B3A8-7F9B96F5419A}']
+ procedure SetServer( const AServer : IServer );
+ end;
+
+ protected type
+ TTestHandlerImpl = class( TInterfacedObject, ITestHandler)
+ private
+ FServer : IServer;
+ protected
+ // ITestHandler
+ procedure SetServer( const AServer : IServer );
+
+ property Server : IServer read FServer write SetServer;
+ end;
+
+ TBenchmarkServiceImpl = class( TTestHandlerImpl, TBenchmarkService.Iface)
+ protected
+ // TBenchmarkService.Iface
+ function fibonacci(n: ShortInt): Integer;
+ end;
+
+
+ TAggrImpl = class( TTestHandlerImpl, TAggr.Iface)
+ protected
+ FList : IThriftList<Integer>;
+
+ // TAggr.Iface
+ procedure addValue(value: Integer);
+ function getValues(): IThriftList<Integer>;
+ public
+ constructor Create;
+ destructor Destroy; override;
+ end;
+
+ public
+ class procedure Execute( const args: array of string);
+ end;
+
+
+implementation
+
+
+{ TTestServer.TTestHandlerImpl }
+
+procedure TTestServer.TTestHandlerImpl.SetServer( const AServer: IServer);
+begin
+ FServer := AServer;
+end;
+
+
+{ TTestServer.TBenchmarkServiceImpl }
+
+function TTestServer.TBenchmarkServiceImpl.fibonacci(n: ShortInt): Integer;
+var prev, next : Integer;
+begin
+ prev := 0;
+ result := 1;
+ while n > 0 do begin
+ next := result + prev;
+ prev := result;
+ result := next;
+ Dec(n);
+ end;
+end;
+
+{ TTestServer.TAggrImpl }
+
+constructor TTestServer.TAggrImpl.Create;
+begin
+ inherited Create;
+ FList := TThriftListImpl<Integer>.Create;
+end;
+
+
+destructor TTestServer.TAggrImpl.Destroy;
+begin
+ try
+ FreeAndNil( FList);
+ finally
+ inherited Destroy;
+ end;
+end;
+
+
+procedure TTestServer.TAggrImpl.addValue(value: Integer);
+begin
+ FList.Add( value);
+end;
+
+
+function TTestServer.TAggrImpl.getValues(): IThriftList<Integer>;
+begin
+ result := FList;
+end;
+
+
+{ TTestServer }
+
+class procedure TTestServer.Execute( const args: array of string);
+var
+ TransportFactory : ITransportFactory;
+ ProtocolFactory : IProtocolFactory;
+ ServerTrans : IServerTransport;
+ benchHandler : TBenchmarkService.Iface;
+ aggrHandler : TAggr.Iface;
+ benchProcessor : IProcessor;
+ aggrProcessor : IProcessor;
+ multiplex : IMultiplexedProcessor;
+ ServerEngine : IServer;
+begin
+ try
+ // create protocol factory, default to BinaryProtocol
+ ProtocolFactory := TBinaryProtocolImpl.TFactory.Create( TRUE, TRUE);
+ servertrans := TServerSocketImpl.Create( 9090, 0, FALSE);
+ TransportFactory := TFramedTransportImpl.TFactory.Create;
+
+ benchHandler := TBenchmarkServiceImpl.Create;
+ benchProcessor := TBenchmarkService.TProcessorImpl.Create( benchHandler);
+
+ aggrHandler := TAggrImpl.Create;
+ aggrProcessor := TAggr.TProcessorImpl.Create( aggrHandler);
+
+ multiplex := TMultiplexedProcessorImpl.Create;
+ multiplex.RegisterProcessor( NAME_BENCHMARKSERVICE, benchProcessor);
+ multiplex.RegisterProcessor( NAME_AGGR, aggrProcessor);
+
+ ServerEngine := TSimpleServer.Create( multiplex,
+ ServerTrans,
+ TransportFactory,
+ ProtocolFactory);
+
+ (benchHandler as ITestHandler).SetServer( ServerEngine);
+ (aggrHandler as ITestHandler).SetServer( ServerEngine);
+
+ Console.WriteLine('Starting the server ...');
+ ServerEngine.serve();
+
+ (benchHandler as ITestHandler).SetServer( nil);
+ (aggrHandler as ITestHandler).SetServer( nil);
+
+ except
+ on E: Exception do
+ begin
+ Console.Write( E.Message);
+ end;
+ end;
+ Console.WriteLine( 'done.');
+end;
+
+
+end.
+
diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr b/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr
new file mode 100644
index 0000000..23e296a
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr
@@ -0,0 +1,65 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+
+program Multiplex.Test.Client;
+
+{$APPTYPE CONSOLE}
+
+uses
+ SysUtils,
+ Multiplex.Client.Main in 'Multiplex.Client.Main.pas',
+ Thrift in '..\..\src\Thrift.pas',
+ Thrift.Transport in '..\..\src\Thrift.Transport.pas',
+ Thrift.Transport.Pipes in '..\..\src\Thrift.Transport.Pipes.pas',
+ Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',
+ Thrift.Protocol.Multiplex in '..\..\src\Thrift.Protocol.Multiplex.pas',
+ Thrift.Collections in '..\..\src\Thrift.Collections.pas',
+ Thrift.Server in '..\..\src\Thrift.Server.pas',
+ Thrift.Stream in '..\..\src\Thrift.Stream.pas',
+ Thrift.Console in '..\..\src\Thrift.Console.pas',
+ Thrift.Utils in '..\..\src\Thrift.Utils.pas';
+
+var
+ nParamCount : Integer;
+ args : array of string;
+ i : Integer;
+ arg : string;
+ s : string;
+
+begin
+ try
+ Writeln( 'Multiplex TestClient '+Thrift.Version);
+ nParamCount := ParamCount;
+ SetLength( args, nParamCount);
+ for i := 1 to nParamCount do
+ begin
+ arg := ParamStr( i );
+ args[i-1] := arg;
+ end;
+ TTestClient.Execute( args );
+ Readln;
+ except
+ on E: Exception do begin
+ Writeln(E.ClassName, ': ', E.Message);
+ ExitCode := $FFFF;
+ end;
+ end;
+end.
+
diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Common.pas b/lib/delphi/test/multiplexed/Multiplex.Test.Common.pas
new file mode 100644
index 0000000..231c3ad
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Test.Common.pas
@@ -0,0 +1,35 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+unit Multiplex.Test.Common;
+
+interface
+
+const
+ NAME_BENCHMARKSERVICE = 'BenchmarkService';
+ NAME_AGGR = 'Aggr';
+
+
+implementation
+
+// nix
+
+end.
+
+
diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr b/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr
new file mode 100644
index 0000000..9da1cdc
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr
@@ -0,0 +1,65 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+program Multiplex.Test.Server;
+
+{$APPTYPE CONSOLE}
+
+uses
+ SysUtils,
+ Multiplex.Server.Main in 'Multiplex.Server.Main.pas',
+ Thrift in '..\..\src\Thrift.pas',
+ Thrift.Transport in '..\..\src\Thrift.Transport.pas',
+ Thrift.Transport.Pipes in '..\..\src\Thrift.Transport.Pipes.pas',
+ Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',
+ Thrift.Protocol.Multiplex in '..\..\src\Thrift.Protocol.Multiplex.pas',
+ Thrift.Processor.Multiplex in '..\..\src\Thrift.Processor.Multiplex.pas',
+ Thrift.Collections in '..\..\src\Thrift.Collections.pas',
+ Thrift.Server in '..\..\src\Thrift.Server.pas',
+ Thrift.Console in '..\..\src\Thrift.Console.pas',
+ Thrift.Utils in '..\..\src\Thrift.Utils.pas',
+ Thrift.Stream in '..\..\src\Thrift.Stream.pas';
+
+var
+ nParamCount : Integer;
+ args : array of string;
+ i : Integer;
+ arg : string;
+ s : string;
+
+begin
+ try
+ Writeln( 'Multiplex TestServer '+Thrift.Version);
+ nParamCount := ParamCount;
+ SetLength( args, nParamCount);
+ for i := 1 to nParamCount do
+ begin
+ arg := ParamStr( i );
+ args[i-1] := arg;
+ end;
+ TTestServer.Execute( args );
+ Writeln('Press ENTER to close ... '); Readln;
+ except
+ on E: Exception do
+ Writeln(E.ClassName, ': ', E.Message);
+ end;
+end.
+
+
+