THRIFT-1899 Delphi: Support for Multiplexing Services on any Transport, Protocol and Server

Patch: Jens Geyer
diff --git a/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas b/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas
new file mode 100644
index 0000000..2cc7ab0
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas
@@ -0,0 +1,130 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+unit Multiplex.Client.Main;
+
+{.$DEFINE StressTest}   // activate to stress-test the server with frequent connects/disconnects
+{.$DEFINE PerfTest}     // activate to activate the performance test
+
+interface
+
+uses
+  Windows, SysUtils, Classes,
+  DateUtils,
+  Generics.Collections,
+  Thrift,
+  Thrift.Protocol,
+  Thrift.Protocol.Multiplex,
+  Thrift.Transport.Pipes,
+  Thrift.Transport,
+  Thrift.Stream,
+  Thrift.Collections,
+  Benchmark,  // in gen-delphi folder
+  Aggr,       // in gen-delphi folder
+  Multiplex.Test.Common;
+
+type
+  TTestClient = class
+  protected
+    FProtocol : IProtocol;
+
+    procedure ParseArgs( const args: array of string);
+    procedure Setup;
+    procedure Run;
+  public
+    constructor Create( const args: array of string);
+    class procedure Execute( const args: array of string);
+  end;
+
+implementation
+
+
+type
+  IServiceClient = interface
+    ['{7745C1C2-AB20-43BA-B6F0-08BF92DE0BAC}']

+    procedure Test;

+  end;
+
+//--- TTestClient -------------------------------------
+
+
+class procedure TTestClient.Execute( const args: array of string);
+var client : TTestClient;
+begin
+  client := TTestClient.Create(args);
+  try
+    client.Run;
+  finally

+    client.Free;

+  end;
+end;
+
+
+constructor TTestClient.Create( const args: array of string);
+begin
+  ParseArgs(args);
+  Setup;
+end;
+
+
+procedure TTestClient.ParseArgs( const args: array of string);
+begin
+  if Length(args) <> 0
+  then raise Exception.Create('No args accepted so far');
+end;
+
+
+procedure TTestClient.Setup;
+var trans : ITransport;
+begin
+  trans := TSocketImpl.Create( 'localhost', 9090);
+  trans := TFramedTransportImpl.Create( trans);
+  trans.Open;
+  FProtocol := TBinaryProtocolImpl.Create( trans, TRUE, TRUE);
+end;
+
+
+procedure TTestClient.Run;
+var bench : TBenchmarkService.Iface;
+    aggr  : TAggr.Iface;
+    multiplex : IProtocol;
+    i         : Integer;
+begin
+  try
+    multiplex := TMultiplexedProtocol.Create( FProtocol, NAME_BENCHMARKSERVICE);
+    bench     := TBenchmarkService.TClient.Create( multiplex);
+
+    multiplex := TMultiplexedProtocol.Create( FProtocol, NAME_AGGR);
+    aggr      := TAggr.TClient.Create( multiplex);
+
+    for i := 1 to 10
+    do aggr.addValue( bench.fibonacci(i));
+
+    for i in aggr.getValues
+    do Write(IntToStr(i)+' ');
+    WriteLn;
+  except
+    on e:Exception do Writeln(#10+e.Message);

+  end;

+end;
+
+
+end.
+
+
diff --git a/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas b/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas
new file mode 100644
index 0000000..4f5cd13
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas
@@ -0,0 +1,201 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+unit Multiplex.Server.Main;
+
+{$WARN SYMBOL_PLATFORM OFF}
+
+{.$DEFINE RunEndless}   // activate to interactively stress-test the server stop routines via Ctrl+C
+
+interface
+
+uses
+  Windows, SysUtils,
+  Generics.Collections,
+  Thrift.Console,
+  Thrift.Server,
+  Thrift.Transport,
+  Thrift.Transport.Pipes,
+  Thrift.Protocol,
+  Thrift.Protocol.Multiplex,
+  Thrift.Processor.Multiplex,
+  Thrift.Collections,
+  Thrift.Utils,
+  Thrift,
+  Benchmark,  // in gen-delphi folder
+  Aggr,       // in gen-delphi folder
+  Multiplex.Test.Common,
+  Contnrs;
+
+type
+  TTestServer = class
+  public type
+    ITestHandler = interface
+      ['{CAE09AAB-80FB-48E9-B3A8-7F9B96F5419A}']
+      procedure SetServer( const AServer : IServer );
+    end;
+
+  protected type
+    TTestHandlerImpl = class( TInterfacedObject, ITestHandler)
+    private
+      FServer : IServer;
+    protected
+      // ITestHandler
+      procedure SetServer( const AServer : IServer );
+
+      property Server : IServer read FServer write SetServer;
+    end;
+
+    TBenchmarkServiceImpl = class( TTestHandlerImpl, TBenchmarkService.Iface)
+    protected
+      // TBenchmarkService.Iface
+      function fibonacci(n: ShortInt): Integer;
+    end;
+
+
+    TAggrImpl = class( TTestHandlerImpl, TAggr.Iface)
+    protected
+      FList : IThriftList<Integer>;
+    
+      // TAggr.Iface
+      procedure addValue(value: Integer);
+      function getValues(): IThriftList<Integer>;

+    public

+      constructor Create;

+      destructor Destroy;  override;

+    end;

+
+  public
+    class procedure Execute( const args: array of string);
+  end;
+
+  
+implementation
+
+
+{ TTestServer.TTestHandlerImpl }
+
+procedure TTestServer.TTestHandlerImpl.SetServer( const AServer: IServer);
+begin
+  FServer := AServer;
+end;
+
+
+{ TTestServer.TBenchmarkServiceImpl }
+
+function TTestServer.TBenchmarkServiceImpl.fibonacci(n: ShortInt): Integer;
+var prev, next : Integer;
+begin
+  prev   := 0;
+  result := 1;
+  while n > 0 do begin
+    next   := result + prev;

+    prev   := result;

+    result := next;

+    Dec(n);

+  end;
+end;
+
+{ TTestServer.TAggrImpl }
+
+constructor TTestServer.TAggrImpl.Create;
+begin

+  inherited Create;
+  FList := TThriftListImpl<Integer>.Create;
+end;
+
+

+destructor TTestServer.TAggrImpl.Destroy;  

+begin
+  try
+    FreeAndNil( FList);

+  finally

+    inherited Destroy;

+  end;                  
+end;
+
+

+procedure TTestServer.TAggrImpl.addValue(value: Integer);
+begin
+  FList.Add( value);
+end;
+
+

+function TTestServer.TAggrImpl.getValues(): IThriftList<Integer>;
+begin
+  result := FList;
+end;
+
+

+{ TTestServer }    
+
+class procedure TTestServer.Execute( const args: array of string);
+var
+  TransportFactory : ITransportFactory;
+  ProtocolFactory  : IProtocolFactory;
+  ServerTrans      : IServerTransport;
+  benchHandler     : TBenchmarkService.Iface;
+  aggrHandler      : TAggr.Iface;
+  benchProcessor   : IProcessor;
+  aggrProcessor    : IProcessor;
+  multiplex        : IMultiplexedProcessor;
+  ServerEngine     : IServer;
+begin
+  try
+    // create protocol factory, default to BinaryProtocol
+    ProtocolFactory  := TBinaryProtocolImpl.TFactory.Create( TRUE, TRUE);
+    servertrans      := TServerSocketImpl.Create( 9090, 0, FALSE);
+    TransportFactory := TFramedTransportImpl.TFactory.Create;
+
+    benchHandler     := TBenchmarkServiceImpl.Create;
+    benchProcessor   := TBenchmarkService.TProcessorImpl.Create( benchHandler);
+
+    aggrHandler      := TAggrImpl.Create;
+    aggrProcessor    := TAggr.TProcessorImpl.Create( aggrHandler);
+
+    multiplex        := TMultiplexedProcessorImpl.Create;
+    multiplex.RegisterProcessor( NAME_BENCHMARKSERVICE, benchProcessor);
+    multiplex.RegisterProcessor( NAME_AGGR,  aggrProcessor);
+
+    ServerEngine := TSimpleServer.Create( multiplex,
+                                          ServerTrans,
+                                          TransportFactory,
+                                          ProtocolFactory);
+
+    (benchHandler as ITestHandler).SetServer( ServerEngine);
+    (aggrHandler as ITestHandler).SetServer( ServerEngine);
+
+    Console.WriteLine('Starting the server ...');
+    ServerEngine.serve();
+
+    (benchHandler as ITestHandler).SetServer( nil);
+    (aggrHandler as ITestHandler).SetServer( nil);
+
+  except
+    on E: Exception do
+    begin
+      Console.Write( E.Message);
+    end;
+  end;
+  Console.WriteLine( 'done.');
+end;
+
+
+end.
+
diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr b/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr
new file mode 100644
index 0000000..23e296a
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr
@@ -0,0 +1,65 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+
+program Multiplex.Test.Client;
+
+{$APPTYPE CONSOLE}
+
+uses
+  SysUtils,

+  Multiplex.Client.Main in 'Multiplex.Client.Main.pas',

+  Thrift in '..\..\src\Thrift.pas',

+  Thrift.Transport in '..\..\src\Thrift.Transport.pas',

+  Thrift.Transport.Pipes in '..\..\src\Thrift.Transport.Pipes.pas',

+  Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',

+  Thrift.Protocol.Multiplex in '..\..\src\Thrift.Protocol.Multiplex.pas',

+  Thrift.Collections in '..\..\src\Thrift.Collections.pas',

+  Thrift.Server in '..\..\src\Thrift.Server.pas',

+  Thrift.Stream in '..\..\src\Thrift.Stream.pas',

+  Thrift.Console in '..\..\src\Thrift.Console.pas',

+  Thrift.Utils in '..\..\src\Thrift.Utils.pas';

+

+var
+  nParamCount : Integer;
+  args : array of string;
+  i : Integer;
+  arg : string;
+  s : string;
+
+begin
+  try
+    Writeln( 'Multiplex TestClient '+Thrift.Version);
+    nParamCount := ParamCount;
+    SetLength( args, nParamCount);
+    for i := 1 to nParamCount do
+    begin
+      arg := ParamStr( i );
+      args[i-1] := arg;
+    end;
+    TTestClient.Execute( args );
+    Readln;
+  except
+    on E: Exception do begin
+      Writeln(E.ClassName, ': ', E.Message);
+      ExitCode := $FFFF;
+    end;
+  end;
+end.
+
diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Common.pas b/lib/delphi/test/multiplexed/Multiplex.Test.Common.pas
new file mode 100644
index 0000000..231c3ad
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Test.Common.pas
@@ -0,0 +1,35 @@
+(*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements. See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership. The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License. You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied. See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ *)

+

+unit Multiplex.Test.Common;

+

+interface

+

+const

+  NAME_BENCHMARKSERVICE = 'BenchmarkService';

+  NAME_AGGR             = 'Aggr';

+

+

+implementation

+

+// nix

+

+end.

+

+

diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr b/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr
new file mode 100644
index 0000000..9da1cdc
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr
@@ -0,0 +1,65 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+program Multiplex.Test.Server;
+
+{$APPTYPE CONSOLE}
+
+uses
+  SysUtils,

+  Multiplex.Server.Main in 'Multiplex.Server.Main.pas',

+  Thrift in '..\..\src\Thrift.pas',

+  Thrift.Transport in '..\..\src\Thrift.Transport.pas',

+  Thrift.Transport.Pipes in '..\..\src\Thrift.Transport.Pipes.pas',

+  Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',

+  Thrift.Protocol.Multiplex in '..\..\src\Thrift.Protocol.Multiplex.pas',

+  Thrift.Processor.Multiplex in '..\..\src\Thrift.Processor.Multiplex.pas',

+  Thrift.Collections in '..\..\src\Thrift.Collections.pas',

+  Thrift.Server in '..\..\src\Thrift.Server.pas',

+  Thrift.Console in '..\..\src\Thrift.Console.pas',

+  Thrift.Utils in '..\..\src\Thrift.Utils.pas',

+  Thrift.Stream in '..\..\src\Thrift.Stream.pas';

+

+var
+  nParamCount : Integer;
+  args : array of string;
+  i : Integer;
+  arg : string;
+  s : string;
+
+begin
+  try
+    Writeln( 'Multiplex TestServer '+Thrift.Version);
+    nParamCount := ParamCount;
+    SetLength( args, nParamCount);
+    for i := 1 to nParamCount do
+    begin
+      arg := ParamStr( i );
+      args[i-1] := arg;
+    end;
+    TTestServer.Execute( args );
+    Writeln('Press ENTER to close ... '); Readln;
+  except
+    on E: Exception do
+      Writeln(E.ClassName, ': ', E.Message);
+  end;
+end.
+
+
+