THRIFT-1899 Delphi: Support for Multiplexing Services on any Transport, Protocol and Server

Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Processor.Multiplex.pas b/lib/delphi/src/Thrift.Processor.Multiplex.pas
new file mode 100644
index 0000000..b771d43
--- /dev/null
+++ b/lib/delphi/src/Thrift.Processor.Multiplex.pas
@@ -0,0 +1,182 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements. See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership. The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License. You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied. See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ *)

+

+unit Thrift.Processor.Multiplex;
+
+
+interface
+
+uses
+  SysUtils,
+  Generics.Collections,
+  Thrift,
+  Thrift.Protocol,
+  Thrift.Protocol.Multiplex;
+
+{ TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services.
+  To do so, you instantiate the processor and then register additional processors with it,
+  as shown in the following example:
+
+
+     TMultiplexedProcessor processor = new TMultiplexedProcessor();
+
+     processor.registerProcessor(
+         "Calculator",
+         new Calculator.Processor(new CalculatorHandler()));
+
+     processor.registerProcessor(
+         "WeatherReport",
+         new WeatherReport.Processor(new WeatherReportHandler()));
+
+     TServerTransport t = new TServerSocket(9090);
+     TSimpleServer server = new TSimpleServer(processor, t);
+
+     server.serve();
+}
+
+
+type
+  IMultiplexedProcessor = interface( IProcessor)
+    ['{810FF32D-22A2-4D58-B129-B0590703ECEC}']

+    // Register a service with this TMultiplexedProcessor.  This allows us
+    // to broker requests to individual services by using the service name
+    // to select them at request time.
+    procedure RegisterProcessor( const serviceName : String; const processor : IProcessor);
+  end;
+
+
+  TMultiplexedProcessorImpl = class( TInterfacedObject, IMultiplexedProcessor, IProcessor)
+  private type
+    // Our goal was to work with any protocol.  In order to do that, we needed
+    // to allow them to call readMessageBegin() and get a TMessage in exactly
+    // the standard format, without the service name prepended to TMessage.name.
+    TStoredMessageProtocol = class( TProtocolDecorator)
+    private
+      FMessageBegin : IMessage;
+    public
+      constructor Create( const protocol : IProtocol; const aMsgBegin : IMessage);
+      function ReadMessageBegin: IMessage; override;
+    end;
+
+  private
+    FServiceProcessorMap : TDictionary<String, IProcessor>;
+
+  public
+    constructor Create;
+    destructor Destroy;  override;
+
+    // Register a service with this TMultiplexedProcessorImpl.  This allows us
+    // to broker requests to individual services by using the service name
+    // to select them at request time.
+    procedure RegisterProcessor( const serviceName : String; const processor : IProcessor);
+
+    { This implementation of process performs the following steps:
+      - Read the beginning of the message.
+      - Extract the service name from the message.
+      - Using the service name to locate the appropriate processor.
+      - Dispatch to the processor, with a decorated instance of TProtocol
+        that allows readMessageBegin() to return the original TMessage.
+
+      An exception is thrown if the message type is not CALL or ONEWAY
+      or if the service is unknown (or not properly registered).
+    }
+    function Process(const iprot, oprot : IProtocol) : Boolean;
+  end;
+
+
+implementation
+
+constructor TMultiplexedProcessorImpl.TStoredMessageProtocol.Create( const protocol : IProtocol; const aMsgBegin : IMessage);
+begin
+  inherited Create( protocol);
+  FMessageBegin := aMsgBegin;
+end;
+
+
+function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: IMessage;
+begin
+  result := FMessageBegin;
+end;
+
+
+constructor TMultiplexedProcessorImpl.Create;
+begin
+  inherited Create;
+  FServiceProcessorMap := TDictionary<string,IProcessor>.Create;
+end;
+
+
+destructor TMultiplexedProcessorImpl.Destroy;
+begin
+  try
+    FreeAndNil( FServiceProcessorMap);
+  finally
+    inherited Destroy;
+  end;
+end;
+
+
+procedure TMultiplexedProcessorImpl.RegisterProcessor( const serviceName : String; const processor : IProcessor);
+begin
+  FServiceProcessorMap.Add( serviceName, processor);
+end;
+
+
+function TMultiplexedProcessorImpl.Process(const iprot, oprot : IProtocol) : Boolean;
+var msg, newMsg : IMessage;
+    idx         : Integer;
+    sService    : string;
+    processor   : IProcessor;
+    protocol    : IProtocol;
+const
+  ERROR_INVALID_MSGTYPE   = 'Message must be "call" or "oneway"';
+  ERROR_INCOMPATIBLE_PROT = 'No service name found in "%s". Client is expected to use TMultiplexProtocol.';
+  ERROR_UNKNOWN_SERVICE   = 'Service "%s" is not registered with MultiplexedProcessor';
+begin
+  // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header.
+  // This pulls the message "off the wire", which we'll deal with at the end of this method.
+  msg := iprot.readMessageBegin();
+  if not (msg.Type_ in [TMessageType.Call, TMessageType.Oneway])
+  then raise TApplicationException.Create( TApplicationException.TExceptionType.InvalidMessageType,
+                                           ERROR_INVALID_MSGTYPE);
+
+  // Extract the service name
+  idx := Pos( TMultiplexedProtocol.SEPARATOR, msg.Name);
+  if idx < 1
+  then raise TApplicationException.Create( TApplicationException.TExceptionType.InvalidProtocol,
+                                           Format(ERROR_INCOMPATIBLE_PROT,[msg.Name]));
+
+  // Create a new TMessage, something that can be consumed by any TProtocol
+  sService := Copy( msg.Name, 1, idx-1);
+  if not FServiceProcessorMap.TryGetValue( sService, processor)
+  then raise TApplicationException.Create( TApplicationException.TExceptionType.InternalError,
+                                           Format(ERROR_UNKNOWN_SERVICE,[sService]));
+
+  // Create a new TMessage, removing the service name
+  Inc( idx, Length(TMultiplexedProtocol.SEPARATOR));
+  newMsg := TMessageImpl.Create( Copy( msg.Name, idx, MAXINT), msg.Type_, msg.SeqID);
+
+  // Dispatch processing to the stored processor
+  protocol := TStoredMessageProtocol.Create( iprot, newMsg);
+  result   := processor.process( protocol, oprot);
+end;
+
+
+end.
+
diff --git a/lib/delphi/src/Thrift.Protocol.Multiplex.pas b/lib/delphi/src/Thrift.Protocol.Multiplex.pas
new file mode 100644
index 0000000..2cd2401
--- /dev/null
+++ b/lib/delphi/src/Thrift.Protocol.Multiplex.pas
@@ -0,0 +1,107 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements. See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership. The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License. You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied. See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ *)

+

+unit Thrift.Protocol.Multiplex;
+
+interface
+
+uses Thrift.Protocol;
+
+{ TMultiplexedProtocol is a protocol-independent concrete decorator
+  that allows a Thrift client to communicate with a multiplexing Thrift server,
+  by prepending the service name to the function name during function calls.
+
+  NOTE: THIS IS NOT USED BY SERVERS.
+  On the server, use TMultiplexedProcessor to handle requests from a multiplexing client.
+
+  This example uses a single socket transport to invoke two services:
+
+      TSocket transport = new TSocket("localhost", 9090);
+      transport.open();
+
+      TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+      TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator");
+      Calculator.Client service = new Calculator.Client(mp);
+
+      TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport");
+      WeatherReport.Client service2 = new WeatherReport.Client(mp2);
+
+      System.out.println(service.add(2,2));
+      System.out.println(service2.getTemperature());
+
+}
+
+type
+  TMultiplexedProtocol = class( TProtocolDecorator)
+  public const
+    {  Used to delimit the service name from the function name }
+    SEPARATOR = ':';
+
+  private
+     FServiceName : String;
+
+  public
+    { Wrap the specified protocol, allowing it to be used to communicate with a multiplexing server.
+      The serviceName is required as it is prepended to the message header so that the multiplexing
+      server can broker the function call to the proper service.
+
+      Args:
+        protocol ....... Your communication protocol of choice, e.g. TBinaryProtocol.
+        serviceName .... The service name of the service communicating via this protocol.
+    }
+    constructor Create( const aProtocol : IProtocol; const aServiceName : string);
+
+    { Prepends the service name to the function name, separated by SEPARATOR.
+      Args: The original message.
+    }
+    procedure WriteMessageBegin( const msg: IMessage); override;
+  end;
+
+
+implementation
+
+
+constructor TMultiplexedProtocol.Create(const aProtocol: IProtocol; const aServiceName: string);
+begin
+  ASSERT( aServiceName <> '');
+  inherited Create(aProtocol);
+  FServiceName := aServiceName;
+end;
+
+
+procedure TMultiplexedProtocol.WriteMessageBegin( const msg: IMessage);
+// Prepends the service name to the function name, separated by TMultiplexedProtocol.SEPARATOR.
+var newMsg : IMessage;
+begin
+  case msg.Type_ of
+    TMessageType.Call,
+    TMessageType.Oneway : begin
+      newMsg := TMessageImpl.Create( FServiceName + SEPARATOR + msg.Name, msg.Type_, msg.SeqID);
+      inherited WriteMessageBegin( newMsg);
+    end;
+
+  else
+    inherited WriteMessageBegin( msg);
+  end;
+end;
+
+
+end.
+
diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas
index 4c1954c..b08458a 100644
--- a/lib/delphi/src/Thrift.Protocol.pas
+++ b/lib/delphi/src/Thrift.Protocol.pas
@@ -437,6 +437,69 @@
     procedure SetReadLength( readLength: Integer );

   end;

 

+

+  { TProtocolDecorator forwards all requests to an enclosed TProtocol instance,

+    providing a way to author concise concrete decorator subclasses. The decorator

+    does not (and should not) modify the behaviour of the enclosed TProtocol

+

+    See p.175 of Design Patterns (by Gamma et al.)

+  }

+  TProtocolDecorator = class( TProtocolImpl)

+  private

+    FWrappedProtocol : IProtocol;

+

+  public

+    // Encloses the specified protocol.

+    // All operations will be forward to the given protocol.  Must be non-null.

+    constructor Create( const aProtocol : IProtocol);

+

+    procedure WriteMessageBegin( const msg: IMessage); override;

+    procedure WriteMessageEnd; override;

+    procedure WriteStructBegin( const struc: IStruct); override;

+    procedure WriteStructEnd; override;

+    procedure WriteFieldBegin( const field: IField); override;

+    procedure WriteFieldEnd; override;

+    procedure WriteFieldStop; override;

+    procedure WriteMapBegin( const map: IMap); override;

+    procedure WriteMapEnd; override;

+    procedure WriteListBegin( const list: IList); override;

+    procedure WriteListEnd(); override;

+    procedure WriteSetBegin( const set_: ISet ); override;

+    procedure WriteSetEnd(); override;

+    procedure WriteBool( b: Boolean); override;

+    procedure WriteByte( b: ShortInt); override;

+    procedure WriteI16( i16: SmallInt); override;

+    procedure WriteI32( i32: Integer); override;

+    procedure WriteI64( const i64: Int64); override;

+    procedure WriteDouble( const d: Double); override;

+    procedure WriteString( const s: string ); override;

+    procedure WriteAnsiString( const s: AnsiString); override;

+    procedure WriteBinary( const b: TBytes); override;

+

+    function ReadMessageBegin: IMessage; override;

+    procedure ReadMessageEnd(); override;

+    function ReadStructBegin: IStruct; override;

+    procedure ReadStructEnd; override;

+    function ReadFieldBegin: IField; override;

+    procedure ReadFieldEnd(); override;

+    function ReadMapBegin: IMap; override;

+    procedure ReadMapEnd(); override;

+    function ReadListBegin: IList; override;

+    procedure ReadListEnd(); override;

+    function ReadSetBegin: ISet; override;

+    procedure ReadSetEnd(); override;

+    function ReadBool: Boolean; override;

+    function ReadByte: ShortInt; override;

+    function ReadI16: SmallInt; override;

+    function ReadI32: Integer; override;

+    function ReadI64: Int64; override;

+    function ReadDouble:Double; override;

+    function ReadBinary: TBytes; override;

+    function ReadString: string; override;

+    function ReadAnsiString: AnsiString; override;

+  end;

+

+

 implementation

 

 function ConvertInt64ToDouble( const n: Int64): Double;

@@ -1228,5 +1291,275 @@
   Result := TBinaryProtocolImpl.Create( trans, FStrictRead, FStrictWrite);

 end;

 

+

+{ TProtocolDecorator }

+

+constructor TProtocolDecorator.Create( const aProtocol : IProtocol);

+begin

+  ASSERT( aProtocol <> nil);

+  inherited Create( aProtocol.Transport);

+  FWrappedProtocol := aProtocol;

+end;

+

+

+procedure TProtocolDecorator.WriteMessageBegin( const msg: IMessage);

+begin

+  FWrappedProtocol.WriteMessageBegin( msg);

+end;

+

+

+procedure TProtocolDecorator.WriteMessageEnd;

+begin

+  FWrappedProtocol.WriteMessageEnd;

+end;

+

+

+procedure TProtocolDecorator.WriteStructBegin( const struc: IStruct);

+begin

+  FWrappedProtocol.WriteStructBegin( struc);

+end;

+

+

+procedure TProtocolDecorator.WriteStructEnd;

+begin

+  FWrappedProtocol.WriteStructEnd;

+end;

+

+

+procedure TProtocolDecorator.WriteFieldBegin( const field: IField);

+begin

+  FWrappedProtocol.WriteFieldBegin( field);

+end;

+

+

+procedure TProtocolDecorator.WriteFieldEnd;

+begin

+  FWrappedProtocol.WriteFieldEnd;

+end;

+

+

+procedure TProtocolDecorator.WriteFieldStop;

+begin

+  FWrappedProtocol.WriteFieldStop;

+end;

+

+

+procedure TProtocolDecorator.WriteMapBegin( const map: IMap);

+begin

+  FWrappedProtocol.WriteMapBegin( map);

+end;

+

+

+procedure TProtocolDecorator.WriteMapEnd;

+begin

+  FWrappedProtocol.WriteMapEnd;

+end;

+

+

+procedure TProtocolDecorator.WriteListBegin( const list: IList);

+begin

+  FWrappedProtocol.WriteListBegin( list);

+end;

+

+

+procedure TProtocolDecorator.WriteListEnd();

+begin

+  FWrappedProtocol.WriteListEnd();

+end;

+

+

+procedure TProtocolDecorator.WriteSetBegin( const set_: ISet );

+begin

+  FWrappedProtocol.WriteSetBegin( set_);

+end;

+

+

+procedure TProtocolDecorator.WriteSetEnd();

+begin

+  FWrappedProtocol.WriteSetEnd();

+end;

+

+

+procedure TProtocolDecorator.WriteBool( b: Boolean);

+begin

+  FWrappedProtocol.WriteBool( b);

+end;

+

+

+procedure TProtocolDecorator.WriteByte( b: ShortInt);

+begin

+  FWrappedProtocol.WriteByte( b);

+end;

+

+

+procedure TProtocolDecorator.WriteI16( i16: SmallInt);

+begin

+  FWrappedProtocol.WriteI16( i16);

+end;

+

+

+procedure TProtocolDecorator.WriteI32( i32: Integer);

+begin

+  FWrappedProtocol.WriteI32( i32);

+end;

+

+

+procedure TProtocolDecorator.WriteI64( const i64: Int64);

+begin

+  FWrappedProtocol.WriteI64( i64);

+end;

+

+

+procedure TProtocolDecorator.WriteDouble( const d: Double);

+begin

+  FWrappedProtocol.WriteDouble( d);

+end;

+

+

+procedure TProtocolDecorator.WriteString( const s: string );

+begin

+  FWrappedProtocol.WriteString( s);

+end;

+

+

+procedure TProtocolDecorator.WriteAnsiString( const s: AnsiString);

+begin

+  FWrappedProtocol.WriteAnsiString( s);

+end;

+

+

+procedure TProtocolDecorator.WriteBinary( const b: TBytes);

+begin

+  FWrappedProtocol.WriteBinary( b);

+end;

+

+

+function TProtocolDecorator.ReadMessageBegin: IMessage;

+begin

+  result := FWrappedProtocol.ReadMessageBegin;

+end;

+

+

+procedure TProtocolDecorator.ReadMessageEnd();

+begin

+  FWrappedProtocol.ReadMessageEnd();

+end;

+

+

+function TProtocolDecorator.ReadStructBegin: IStruct;

+begin

+  result := FWrappedProtocol.ReadStructBegin;

+end;

+

+

+procedure TProtocolDecorator.ReadStructEnd;

+begin

+  FWrappedProtocol.ReadStructEnd;

+end;

+

+

+function TProtocolDecorator.ReadFieldBegin: IField;

+begin

+  result := FWrappedProtocol.ReadFieldBegin;

+end;

+

+

+procedure TProtocolDecorator.ReadFieldEnd();

+begin

+  FWrappedProtocol.ReadFieldEnd();

+end;

+

+

+function TProtocolDecorator.ReadMapBegin: IMap;

+begin

+  result := FWrappedProtocol.ReadMapBegin;

+end;

+

+

+procedure TProtocolDecorator.ReadMapEnd();

+begin

+  FWrappedProtocol.ReadMapEnd();

+end;

+

+

+function TProtocolDecorator.ReadListBegin: IList;

+begin

+  result := FWrappedProtocol.ReadListBegin;

+end;

+

+

+procedure TProtocolDecorator.ReadListEnd();

+begin

+  FWrappedProtocol.ReadListEnd();

+end;

+

+

+function TProtocolDecorator.ReadSetBegin: ISet;

+begin

+  result := FWrappedProtocol.ReadSetBegin;

+end;

+

+

+procedure TProtocolDecorator.ReadSetEnd();

+begin

+  FWrappedProtocol.ReadSetEnd();

+end;

+

+

+function TProtocolDecorator.ReadBool: Boolean;

+begin

+  result := FWrappedProtocol.ReadBool;

+end;

+

+

+function TProtocolDecorator.ReadByte: ShortInt;

+begin

+  result := FWrappedProtocol.ReadByte;

+end;

+

+

+function TProtocolDecorator.ReadI16: SmallInt;

+begin

+  result := FWrappedProtocol.ReadI16;

+end;

+

+

+function TProtocolDecorator.ReadI32: Integer;

+begin

+  result := FWrappedProtocol.ReadI32;

+end;

+

+

+function TProtocolDecorator.ReadI64: Int64;

+begin

+  result := FWrappedProtocol.ReadI64;

+end;

+

+

+function TProtocolDecorator.ReadDouble:Double;

+begin

+  result := FWrappedProtocol.ReadDouble;

+end;

+

+

+function TProtocolDecorator.ReadBinary: TBytes;

+begin

+  result := FWrappedProtocol.ReadBinary;

+end;

+

+

+function TProtocolDecorator.ReadString: string;

+begin

+  result := FWrappedProtocol.ReadString;

+end;

+

+

+function TProtocolDecorator.ReadAnsiString: AnsiString;

+begin

+  result := FWrappedProtocol.ReadAnsiString;

+end;

+

+

+

 end.