THRIFT-1899 Delphi: Support for Multiplexing Services on any Transport, Protocol and Server

Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Processor.Multiplex.pas b/lib/delphi/src/Thrift.Processor.Multiplex.pas
new file mode 100644
index 0000000..b771d43
--- /dev/null
+++ b/lib/delphi/src/Thrift.Processor.Multiplex.pas
@@ -0,0 +1,182 @@
+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements. See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership. The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License. You may obtain a copy of the License at

+ *

+ *

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an


+ * KIND, either express or implied. See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ *)


+unit Thrift.Processor.Multiplex;
+  SysUtils,
+  Generics.Collections,
+  Thrift,
+  Thrift.Protocol,
+  Thrift.Protocol.Multiplex;
+{ TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services.
+  To do so, you instantiate the processor and then register additional processors with it,
+  as shown in the following example:
+     TMultiplexedProcessor processor = new TMultiplexedProcessor();
+     processor.registerProcessor(
+         "Calculator",
+         new Calculator.Processor(new CalculatorHandler()));
+     processor.registerProcessor(
+         "WeatherReport",
+         new WeatherReport.Processor(new WeatherReportHandler()));
+     TServerTransport t = new TServerSocket(9090);
+     TSimpleServer server = new TSimpleServer(processor, t);
+     server.serve();
+  IMultiplexedProcessor = interface( IProcessor)
+    ['{810FF32D-22A2-4D58-B129-B0590703ECEC}']

+    // Register a service with this TMultiplexedProcessor.  This allows us
+    // to broker requests to individual services by using the service name
+    // to select them at request time.
+    procedure RegisterProcessor( const serviceName : String; const processor : IProcessor);
+  end;
+  TMultiplexedProcessorImpl = class( TInterfacedObject, IMultiplexedProcessor, IProcessor)
+  private type
+    // Our goal was to work with any protocol.  In order to do that, we needed
+    // to allow them to call readMessageBegin() and get a TMessage in exactly
+    // the standard format, without the service name prepended to
+    TStoredMessageProtocol = class( TProtocolDecorator)
+    private
+      FMessageBegin : IMessage;
+    public
+      constructor Create( const protocol : IProtocol; const aMsgBegin : IMessage);
+      function ReadMessageBegin: IMessage; override;
+    end;
+  private
+    FServiceProcessorMap : TDictionary<String, IProcessor>;
+  public
+    constructor Create;
+    destructor Destroy;  override;
+    // Register a service with this TMultiplexedProcessorImpl.  This allows us
+    // to broker requests to individual services by using the service name
+    // to select them at request time.
+    procedure RegisterProcessor( const serviceName : String; const processor : IProcessor);
+    { This implementation of process performs the following steps:
+      - Read the beginning of the message.
+      - Extract the service name from the message.
+      - Using the service name to locate the appropriate processor.
+      - Dispatch to the processor, with a decorated instance of TProtocol
+        that allows readMessageBegin() to return the original TMessage.
+      An exception is thrown if the message type is not CALL or ONEWAY
+      or if the service is unknown (or not properly registered).
+    }
+    function Process(const iprot, oprot : IProtocol) : Boolean;
+  end;
+constructor TMultiplexedProcessorImpl.TStoredMessageProtocol.Create( const protocol : IProtocol; const aMsgBegin : IMessage);
+  inherited Create( protocol);
+  FMessageBegin := aMsgBegin;
+function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: IMessage;
+  result := FMessageBegin;
+constructor TMultiplexedProcessorImpl.Create;
+  inherited Create;
+  FServiceProcessorMap := TDictionary<string,IProcessor>.Create;
+destructor TMultiplexedProcessorImpl.Destroy;
+  try
+    FreeAndNil( FServiceProcessorMap);
+  finally
+    inherited Destroy;
+  end;
+procedure TMultiplexedProcessorImpl.RegisterProcessor( const serviceName : String; const processor : IProcessor);
+  FServiceProcessorMap.Add( serviceName, processor);
+function TMultiplexedProcessorImpl.Process(const iprot, oprot : IProtocol) : Boolean;
+var msg, newMsg : IMessage;
+    idx         : Integer;
+    sService    : string;
+    processor   : IProcessor;
+    protocol    : IProtocol;
+  ERROR_INVALID_MSGTYPE   = 'Message must be "call" or "oneway"';
+  ERROR_INCOMPATIBLE_PROT = 'No service name found in "%s". Client is expected to use TMultiplexProtocol.';
+  ERROR_UNKNOWN_SERVICE   = 'Service "%s" is not registered with MultiplexedProcessor';
+  // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header.
+  // This pulls the message "off the wire", which we'll deal with at the end of this method.
+  msg := iprot.readMessageBegin();
+  if not (msg.Type_ in [TMessageType.Call, TMessageType.Oneway])
+  then raise TApplicationException.Create( TApplicationException.TExceptionType.InvalidMessageType,
+                                           ERROR_INVALID_MSGTYPE);
+  // Extract the service name
+  idx := Pos( TMultiplexedProtocol.SEPARATOR, msg.Name);
+  if idx < 1
+  then raise TApplicationException.Create( TApplicationException.TExceptionType.InvalidProtocol,
+                                           Format(ERROR_INCOMPATIBLE_PROT,[msg.Name]));
+  // Create a new TMessage, something that can be consumed by any TProtocol
+  sService := Copy( msg.Name, 1, idx-1);
+  if not FServiceProcessorMap.TryGetValue( sService, processor)
+  then raise TApplicationException.Create( TApplicationException.TExceptionType.InternalError,
+                                           Format(ERROR_UNKNOWN_SERVICE,[sService]));
+  // Create a new TMessage, removing the service name
+  Inc( idx, Length(TMultiplexedProtocol.SEPARATOR));
+  newMsg := TMessageImpl.Create( Copy( msg.Name, idx, MAXINT), msg.Type_, msg.SeqID);
+  // Dispatch processing to the stored processor
+  protocol := TStoredMessageProtocol.Create( iprot, newMsg);
+  result   := processor.process( protocol, oprot);
diff --git a/lib/delphi/src/Thrift.Protocol.Multiplex.pas b/lib/delphi/src/Thrift.Protocol.Multiplex.pas
new file mode 100644
index 0000000..2cd2401
--- /dev/null
+++ b/lib/delphi/src/Thrift.Protocol.Multiplex.pas
@@ -0,0 +1,107 @@
+unit Thrift.Protocol.Multiplex;
+uses Thrift.Protocol;
+{ TMultiplexedProtocol is a protocol-independent concrete decorator
+  that allows a Thrift client to communicate with a multiplexing Thrift server,
+  by prepending the service name to the function name during function calls.
+  On the server, use TMultiplexedProcessor to handle requests from a multiplexing client.
+  This example uses a single socket transport to invoke two services:
+      TSocket transport = new TSocket("localhost", 9090);
+      TBinaryProtocol protocol = new TBinaryProtocol(transport);
+      TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator");
+      Calculator.Client service = new Calculator.Client(mp);
+      TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport");
+      WeatherReport.Client service2 = new WeatherReport.Client(mp2);
+      System.out.println(service.add(2,2));
+      System.out.println(service2.getTemperature());
+  TMultiplexedProtocol = class( TProtocolDecorator)
+  public const
+    {  Used to delimit the service name from the function name }
+    SEPARATOR = ':';
+  private
+     FServiceName : String;
+  public
+    { Wrap the specified protocol, allowing it to be used to communicate with a multiplexing server.
+      The serviceName is required as it is prepended to the message header so that the multiplexing
+      server can broker the function call to the proper service.
+      Args:
+        protocol ....... Your communication protocol of choice, e.g. TBinaryProtocol.
+        serviceName .... The service name of the service communicating via this protocol.
+    }
+    constructor Create( const aProtocol : IProtocol; const aServiceName : string);
+    { Prepends the service name to the function name, separated by SEPARATOR.
+      Args: The original message.
+    }
+    procedure WriteMessageBegin( const msg: IMessage); override;
+  end;
+constructor TMultiplexedProtocol.Create(const aProtocol: IProtocol; const aServiceName: string);
+  ASSERT( aServiceName <> '');
+  inherited Create(aProtocol);
+  FServiceName := aServiceName;
+procedure TMultiplexedProtocol.WriteMessageBegin( const msg: IMessage);
+// Prepends the service name to the function name, separated by TMultiplexedProtocol.SEPARATOR.
+var newMsg : IMessage;
+  case msg.Type_ of
+    TMessageType.Call,
+    TMessageType.Oneway : begin
+      newMsg := TMessageImpl.Create( FServiceName + SEPARATOR + msg.Name, msg.Type_, msg.SeqID);
+      inherited WriteMessageBegin( newMsg);
+    end;
+  else
+    inherited WriteMessageBegin( msg);
+  end;
diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas
index 4c1954c..b08458a 100644
--- a/lib/delphi/src/Thrift.Protocol.pas
+++ b/lib/delphi/src/Thrift.Protocol.pas
@@ -437,6 +437,69 @@
     procedure SetReadLength( readLength: Integer );




+  { TProtocolDecorator forwards all requests to an enclosed TProtocol instance,

+    providing a way to author concise concrete decorator subclasses. The decorator

+    does not (and should not) modify the behaviour of the enclosed TProtocol


+    See p.175 of Design Patterns (by Gamma et al.)

+  }

+  TProtocolDecorator = class( TProtocolImpl)

+  private

+    FWrappedProtocol : IProtocol;


+  public

+    // Encloses the specified protocol.

+    // All operations will be forward to the given protocol.  Must be non-null.

+    constructor Create( const aProtocol : IProtocol);


+    procedure WriteMessageBegin( const msg: IMessage); override;

+    procedure WriteMessageEnd; override;

+    procedure WriteStructBegin( const struc: IStruct); override;

+    procedure WriteStructEnd; override;

+    procedure WriteFieldBegin( const field: IField); override;

+    procedure WriteFieldEnd; override;

+    procedure WriteFieldStop; override;

+    procedure WriteMapBegin( const map: IMap); override;

+    procedure WriteMapEnd; override;

+    procedure WriteListBegin( const list: IList); override;

+    procedure WriteListEnd(); override;

+    procedure WriteSetBegin( const set_: ISet ); override;

+    procedure WriteSetEnd(); override;

+    procedure WriteBool( b: Boolean); override;

+    procedure WriteByte( b: ShortInt); override;

+    procedure WriteI16( i16: SmallInt); override;

+    procedure WriteI32( i32: Integer); override;

+    procedure WriteI64( const i64: Int64); override;

+    procedure WriteDouble( const d: Double); override;

+    procedure WriteString( const s: string ); override;

+    procedure WriteAnsiString( const s: AnsiString); override;

+    procedure WriteBinary( const b: TBytes); override;


+    function ReadMessageBegin: IMessage; override;

+    procedure ReadMessageEnd(); override;

+    function ReadStructBegin: IStruct; override;

+    procedure ReadStructEnd; override;

+    function ReadFieldBegin: IField; override;

+    procedure ReadFieldEnd(); override;

+    function ReadMapBegin: IMap; override;

+    procedure ReadMapEnd(); override;

+    function ReadListBegin: IList; override;

+    procedure ReadListEnd(); override;

+    function ReadSetBegin: ISet; override;

+    procedure ReadSetEnd(); override;

+    function ReadBool: Boolean; override;

+    function ReadByte: ShortInt; override;

+    function ReadI16: SmallInt; override;

+    function ReadI32: Integer; override;

+    function ReadI64: Int64; override;

+    function ReadDouble:Double; override;

+    function ReadBinary: TBytes; override;

+    function ReadString: string; override;

+    function ReadAnsiString: AnsiString; override;

+  end;





 function ConvertInt64ToDouble( const n: Int64): Double;

@@ -1228,5 +1291,275 @@
   Result := TBinaryProtocolImpl.Create( trans, FStrictRead, FStrictWrite);




+{ TProtocolDecorator }


+constructor TProtocolDecorator.Create( const aProtocol : IProtocol);


+  ASSERT( aProtocol <> nil);

+  inherited Create( aProtocol.Transport);

+  FWrappedProtocol := aProtocol;




+procedure TProtocolDecorator.WriteMessageBegin( const msg: IMessage);


+  FWrappedProtocol.WriteMessageBegin( msg);




+procedure TProtocolDecorator.WriteMessageEnd;


+  FWrappedProtocol.WriteMessageEnd;




+procedure TProtocolDecorator.WriteStructBegin( const struc: IStruct);


+  FWrappedProtocol.WriteStructBegin( struc);




+procedure TProtocolDecorator.WriteStructEnd;


+  FWrappedProtocol.WriteStructEnd;




+procedure TProtocolDecorator.WriteFieldBegin( const field: IField);


+  FWrappedProtocol.WriteFieldBegin( field);




+procedure TProtocolDecorator.WriteFieldEnd;


+  FWrappedProtocol.WriteFieldEnd;




+procedure TProtocolDecorator.WriteFieldStop;


+  FWrappedProtocol.WriteFieldStop;




+procedure TProtocolDecorator.WriteMapBegin( const map: IMap);


+  FWrappedProtocol.WriteMapBegin( map);




+procedure TProtocolDecorator.WriteMapEnd;


+  FWrappedProtocol.WriteMapEnd;




+procedure TProtocolDecorator.WriteListBegin( const list: IList);


+  FWrappedProtocol.WriteListBegin( list);




+procedure TProtocolDecorator.WriteListEnd();


+  FWrappedProtocol.WriteListEnd();




+procedure TProtocolDecorator.WriteSetBegin( const set_: ISet );


+  FWrappedProtocol.WriteSetBegin( set_);




+procedure TProtocolDecorator.WriteSetEnd();


+  FWrappedProtocol.WriteSetEnd();




+procedure TProtocolDecorator.WriteBool( b: Boolean);


+  FWrappedProtocol.WriteBool( b);




+procedure TProtocolDecorator.WriteByte( b: ShortInt);


+  FWrappedProtocol.WriteByte( b);




+procedure TProtocolDecorator.WriteI16( i16: SmallInt);


+  FWrappedProtocol.WriteI16( i16);




+procedure TProtocolDecorator.WriteI32( i32: Integer);


+  FWrappedProtocol.WriteI32( i32);




+procedure TProtocolDecorator.WriteI64( const i64: Int64);


+  FWrappedProtocol.WriteI64( i64);




+procedure TProtocolDecorator.WriteDouble( const d: Double);


+  FWrappedProtocol.WriteDouble( d);




+procedure TProtocolDecorator.WriteString( const s: string );


+  FWrappedProtocol.WriteString( s);




+procedure TProtocolDecorator.WriteAnsiString( const s: AnsiString);


+  FWrappedProtocol.WriteAnsiString( s);




+procedure TProtocolDecorator.WriteBinary( const b: TBytes);


+  FWrappedProtocol.WriteBinary( b);




+function TProtocolDecorator.ReadMessageBegin: IMessage;


+  result := FWrappedProtocol.ReadMessageBegin;




+procedure TProtocolDecorator.ReadMessageEnd();


+  FWrappedProtocol.ReadMessageEnd();




+function TProtocolDecorator.ReadStructBegin: IStruct;


+  result := FWrappedProtocol.ReadStructBegin;




+procedure TProtocolDecorator.ReadStructEnd;


+  FWrappedProtocol.ReadStructEnd;




+function TProtocolDecorator.ReadFieldBegin: IField;


+  result := FWrappedProtocol.ReadFieldBegin;




+procedure TProtocolDecorator.ReadFieldEnd();


+  FWrappedProtocol.ReadFieldEnd();




+function TProtocolDecorator.ReadMapBegin: IMap;


+  result := FWrappedProtocol.ReadMapBegin;




+procedure TProtocolDecorator.ReadMapEnd();


+  FWrappedProtocol.ReadMapEnd();




+function TProtocolDecorator.ReadListBegin: IList;


+  result := FWrappedProtocol.ReadListBegin;




+procedure TProtocolDecorator.ReadListEnd();


+  FWrappedProtocol.ReadListEnd();




+function TProtocolDecorator.ReadSetBegin: ISet;


+  result := FWrappedProtocol.ReadSetBegin;




+procedure TProtocolDecorator.ReadSetEnd();


+  FWrappedProtocol.ReadSetEnd();




+function TProtocolDecorator.ReadBool: Boolean;


+  result := FWrappedProtocol.ReadBool;




+function TProtocolDecorator.ReadByte: ShortInt;


+  result := FWrappedProtocol.ReadByte;




+function TProtocolDecorator.ReadI16: SmallInt;


+  result := FWrappedProtocol.ReadI16;




+function TProtocolDecorator.ReadI32: Integer;


+  result := FWrappedProtocol.ReadI32;




+function TProtocolDecorator.ReadI64: Int64;


+  result := FWrappedProtocol.ReadI64;




+function TProtocolDecorator.ReadDouble:Double;


+  result := FWrappedProtocol.ReadDouble;




+function TProtocolDecorator.ReadBinary: TBytes;


+  result := FWrappedProtocol.ReadBinary;




+function TProtocolDecorator.ReadString: string;


+  result := FWrappedProtocol.ReadString;




+function TProtocolDecorator.ReadAnsiString: AnsiString;


+  result := FWrappedProtocol.ReadAnsiString;







diff --git a/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas b/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas
new file mode 100644
index 0000000..2cc7ab0
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas
@@ -0,0 +1,130 @@
+unit Multiplex.Client.Main;
+{.$DEFINE StressTest}   // activate to stress-test the server with frequent connects/disconnects
+{.$DEFINE PerfTest}     // activate to activate the performance test
+  Windows, SysUtils, Classes,
+  DateUtils,
+  Generics.Collections,
+  Thrift,
+  Thrift.Protocol,
+  Thrift.Protocol.Multiplex,
+  Thrift.Transport.Pipes,
+  Thrift.Transport,
+  Thrift.Stream,
+  Thrift.Collections,
+  Benchmark,  // in gen-delphi folder
+  Aggr,       // in gen-delphi folder
+  Multiplex.Test.Common;
+  TTestClient = class
+  protected
+    FProtocol : IProtocol;
+    procedure ParseArgs( const args: array of string);
+    procedure Setup;
+    procedure Run;
+  public
+    constructor Create( const args: array of string);
+    class procedure Execute( const args: array of string);
+  end;
+  IServiceClient = interface
+    ['{7745C1C2-AB20-43BA-B6F0-08BF92DE0BAC}']

+    procedure Test;

+  end;
+//--- TTestClient -------------------------------------
+class procedure TTestClient.Execute( const args: array of string);
+var client : TTestClient;
+  client := TTestClient.Create(args);
+  try
+    client.Run;
+  finally

+    client.Free;

+  end;
+constructor TTestClient.Create( const args: array of string);
+  ParseArgs(args);
+  Setup;
+procedure TTestClient.ParseArgs( const args: array of string);
+  if Length(args) <> 0
+  then raise Exception.Create('No args accepted so far');
+procedure TTestClient.Setup;
+var trans : ITransport;
+  trans := TSocketImpl.Create( 'localhost', 9090);
+  trans := TFramedTransportImpl.Create( trans);
+  trans.Open;
+  FProtocol := TBinaryProtocolImpl.Create( trans, TRUE, TRUE);
+procedure TTestClient.Run;
+var bench : TBenchmarkService.Iface;
+    aggr  : TAggr.Iface;
+    multiplex : IProtocol;
+    i         : Integer;
+  try
+    multiplex := TMultiplexedProtocol.Create( FProtocol, NAME_BENCHMARKSERVICE);
+    bench     := TBenchmarkService.TClient.Create( multiplex);
+    multiplex := TMultiplexedProtocol.Create( FProtocol, NAME_AGGR);
+    aggr      := TAggr.TClient.Create( multiplex);
+    for i := 1 to 10
+    do aggr.addValue( bench.fibonacci(i));
+    for i in aggr.getValues
+    do Write(IntToStr(i)+' ');
+    WriteLn;
+  except
+    on e:Exception do Writeln(#10+e.Message);

+  end;

diff --git a/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas b/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas
new file mode 100644
index 0000000..4f5cd13
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas
@@ -0,0 +1,201 @@
+unit Multiplex.Server.Main;
+{.$DEFINE RunEndless}   // activate to interactively stress-test the server stop routines via Ctrl+C
+  Windows, SysUtils,
+  Generics.Collections,
+  Thrift.Console,
+  Thrift.Server,
+  Thrift.Transport,
+  Thrift.Transport.Pipes,
+  Thrift.Protocol,
+  Thrift.Protocol.Multiplex,
+  Thrift.Processor.Multiplex,
+  Thrift.Collections,
+  Thrift.Utils,
+  Thrift,
+  Benchmark,  // in gen-delphi folder
+  Aggr,       // in gen-delphi folder
+  Multiplex.Test.Common,
+  Contnrs;
+  TTestServer = class
+  public type
+    ITestHandler = interface
+      ['{CAE09AAB-80FB-48E9-B3A8-7F9B96F5419A}']
+      procedure SetServer( const AServer : IServer );
+    end;
+  protected type
+    TTestHandlerImpl = class( TInterfacedObject, ITestHandler)
+    private
+      FServer : IServer;
+    protected
+      // ITestHandler
+      procedure SetServer( const AServer : IServer );
+      property Server : IServer read FServer write SetServer;
+    end;
+    TBenchmarkServiceImpl = class( TTestHandlerImpl, TBenchmarkService.Iface)
+    protected
+      // TBenchmarkService.Iface
+      function fibonacci(n: ShortInt): Integer;
+    end;
+    TAggrImpl = class( TTestHandlerImpl, TAggr.Iface)
+    protected
+      FList : IThriftList<Integer>;
+      // TAggr.Iface
+      procedure addValue(value: Integer);
+      function getValues(): IThriftList<Integer>;

+    public

+      constructor Create;

+      destructor Destroy;  override;

+    end;

+  public
+    class procedure Execute( const args: array of string);
+  end;
+{ TTestServer.TTestHandlerImpl }
+procedure TTestServer.TTestHandlerImpl.SetServer( const AServer: IServer);
+  FServer := AServer;
+{ TTestServer.TBenchmarkServiceImpl }
+function TTestServer.TBenchmarkServiceImpl.fibonacci(n: ShortInt): Integer;
+var prev, next : Integer;
+  prev   := 0;
+  result := 1;
+  while n > 0 do begin
+    next   := result + prev;

+    prev   := result;

+    result := next;

+    Dec(n);

+  end;
+{ TTestServer.TAggrImpl }
+constructor TTestServer.TAggrImpl.Create;

+  inherited Create;
+  FList := TThriftListImpl<Integer>.Create;

+destructor TTestServer.TAggrImpl.Destroy;  

+  try
+    FreeAndNil( FList);

+  finally

+    inherited Destroy;

+  end;                  

+procedure TTestServer.TAggrImpl.addValue(value: Integer);
+  FList.Add( value);

+function TTestServer.TAggrImpl.getValues(): IThriftList<Integer>;
+  result := FList;

+{ TTestServer }    
+class procedure TTestServer.Execute( const args: array of string);
+  TransportFactory : ITransportFactory;
+  ProtocolFactory  : IProtocolFactory;
+  ServerTrans      : IServerTransport;
+  benchHandler     : TBenchmarkService.Iface;
+  aggrHandler      : TAggr.Iface;
+  benchProcessor   : IProcessor;
+  aggrProcessor    : IProcessor;
+  multiplex        : IMultiplexedProcessor;
+  ServerEngine     : IServer;
+  try
+    // create protocol factory, default to BinaryProtocol
+    ProtocolFactory  := TBinaryProtocolImpl.TFactory.Create( TRUE, TRUE);
+    servertrans      := TServerSocketImpl.Create( 9090, 0, FALSE);
+    TransportFactory := TFramedTransportImpl.TFactory.Create;
+    benchHandler     := TBenchmarkServiceImpl.Create;
+    benchProcessor   := TBenchmarkService.TProcessorImpl.Create( benchHandler);
+    aggrHandler      := TAggrImpl.Create;
+    aggrProcessor    := TAggr.TProcessorImpl.Create( aggrHandler);
+    multiplex        := TMultiplexedProcessorImpl.Create;
+    multiplex.RegisterProcessor( NAME_BENCHMARKSERVICE, benchProcessor);
+    multiplex.RegisterProcessor( NAME_AGGR,  aggrProcessor);
+    ServerEngine := TSimpleServer.Create( multiplex,
+                                          ServerTrans,
+                                          TransportFactory,
+                                          ProtocolFactory);
+    (benchHandler as ITestHandler).SetServer( ServerEngine);
+    (aggrHandler as ITestHandler).SetServer( ServerEngine);
+    Console.WriteLine('Starting the server ...');
+    ServerEngine.serve();
+    (benchHandler as ITestHandler).SetServer( nil);
+    (aggrHandler as ITestHandler).SetServer( nil);
+  except
+    on E: Exception do
+    begin
+      Console.Write( E.Message);
+    end;
+  end;
+  Console.WriteLine( 'done.');
diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr b/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr
new file mode 100644
index 0000000..23e296a
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr
@@ -0,0 +1,65 @@
+program Multiplex.Test.Client;
+  SysUtils,

+  Multiplex.Client.Main in 'Multiplex.Client.Main.pas',

+  Thrift in '..\..\src\Thrift.pas',

+  Thrift.Transport in '..\..\src\Thrift.Transport.pas',

+  Thrift.Transport.Pipes in '..\..\src\Thrift.Transport.Pipes.pas',

+  Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',

+  Thrift.Protocol.Multiplex in '..\..\src\Thrift.Protocol.Multiplex.pas',

+  Thrift.Collections in '..\..\src\Thrift.Collections.pas',

+  Thrift.Server in '..\..\src\Thrift.Server.pas',

+  Thrift.Stream in '..\..\src\Thrift.Stream.pas',

+  Thrift.Console in '..\..\src\Thrift.Console.pas',

+  Thrift.Utils in '..\..\src\Thrift.Utils.pas';


+  nParamCount : Integer;
+  args : array of string;
+  i : Integer;
+  arg : string;
+  s : string;
+  try
+    Writeln( 'Multiplex TestClient '+Thrift.Version);
+    nParamCount := ParamCount;
+    SetLength( args, nParamCount);
+    for i := 1 to nParamCount do
+    begin
+      arg := ParamStr( i );
+      args[i-1] := arg;
+    end;
+    TTestClient.Execute( args );
+    Readln;
+  except
+    on E: Exception do begin
+      Writeln(E.ClassName, ': ', E.Message);
+      ExitCode := $FFFF;
+    end;
+  end;
diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Common.pas b/lib/delphi/test/multiplexed/Multiplex.Test.Common.pas
new file mode 100644
index 0000000..231c3ad
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Test.Common.pas
@@ -0,0 +1,35 @@

+unit Multiplex.Test.Common;





+  NAME_BENCHMARKSERVICE = 'BenchmarkService';

+  NAME_AGGR             = 'Aggr';





+// nix





diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr b/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr
new file mode 100644
index 0000000..9da1cdc
--- /dev/null
+++ b/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr
@@ -0,0 +1,65 @@
+program Multiplex.Test.Server;
+  SysUtils,

+  Multiplex.Server.Main in 'Multiplex.Server.Main.pas',

+  Thrift in '..\..\src\Thrift.pas',

+  Thrift.Transport in '..\..\src\Thrift.Transport.pas',

+  Thrift.Transport.Pipes in '..\..\src\Thrift.Transport.Pipes.pas',

+  Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',

+  Thrift.Protocol.Multiplex in '..\..\src\Thrift.Protocol.Multiplex.pas',

+  Thrift.Processor.Multiplex in '..\..\src\Thrift.Processor.Multiplex.pas',

+  Thrift.Collections in '..\..\src\Thrift.Collections.pas',

+  Thrift.Server in '..\..\src\Thrift.Server.pas',

+  Thrift.Console in '..\..\src\Thrift.Console.pas',

+  Thrift.Utils in '..\..\src\Thrift.Utils.pas',

+  Thrift.Stream in '..\..\src\Thrift.Stream.pas';


+  nParamCount : Integer;
+  args : array of string;
+  i : Integer;
+  arg : string;
+  s : string;
+  try
+    Writeln( 'Multiplex TestServer '+Thrift.Version);
+    nParamCount := ParamCount;
+    SetLength( args, nParamCount);
+    for i := 1 to nParamCount do
+    begin
+      arg := ParamStr( i );
+      args[i-1] := arg;
+    end;
+    TTestServer.Execute( args );
+    Writeln('Press ENTER to close ... '); Readln;
+  except
+    on E: Exception do
+      Writeln(E.ClassName, ': ', E.Message);
+  end;