THRIFT-2382 contrib: sample for connecting Thrift with STOMP

Patch: Jens Geyer
diff --git a/contrib/Stomp/Thrift.Transport.STOMP.pas b/contrib/Stomp/Thrift.Transport.STOMP.pas
new file mode 100644
index 0000000..7dfb376
--- /dev/null
+++ b/contrib/Stomp/Thrift.Transport.STOMP.pas
@@ -0,0 +1,200 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+unit Thrift.Transport.STOMP;
+
+interface
+
+uses
+  Classes,Windows, SysUtils,
+  Thrift,
+  Thrift.Transport,
+  Thrift.Protocol,
+  Thrift.Stream,
+  StompClient,
+  StompTypes;
+
+type
+  TStompTransportImpl = class( TStreamTransportImpl)
+  strict private
+    FData     : TStringStream;
+    FServer   : string;
+    FOutQueue : string;
+    FStompCli : IStompClient;
+  protected
+    function GetIsOpen: Boolean; override;
+    function Peek: Boolean; override;
+  public
+    constructor Create( const aServerAndPort, aOutQueue : string);
+    destructor Destroy;  override;
+
+    procedure Open();  override;
+    procedure Close();  override;
+    procedure Flush;  override;
+  end;
+
+
+  TStompServerTransportImpl = class( TServerTransportImpl)
+  strict private
+    FServer  : string;
+    FInQueue : string;
+    FClient  : IStompClient;
+  protected
+    procedure Listen; override;
+    procedure Close; override;
+    function Accept( const fnAccepting: TProc): ITransport; override;
+  public
+    constructor Create( const aServerAndPort, aInQueue : string);
+    destructor Destroy;  override;
+  end;
+
+
+const
+  QUEUE_PREFIX    = '/queue/';
+  TOPIC_PREFIX    = '/topic/';
+  EXCHANGE_PREFIX = '/exchange/';
+
+
+implementation
+
+
+
+constructor TStompTransportImpl.Create( const aServerAndPort, aOutQueue : string);
+var adapter : IThriftStream;
+begin
+  FData     := TStringStream.Create;
+  FServer   := aServerAndPort;
+  FOutQueue := aOutQueue;
+
+  adapter := TThriftStreamAdapterDelphi.Create( FData, FALSE);
+  inherited Create( nil, adapter);  // output only
+end;
+
+
+destructor TStompTransportImpl.Destroy;
+begin
+  inherited Destroy;
+  FreeAndNil( FData);
+  FStompCli := nil;
+end;
+
+
+function TStompTransportImpl.GetIsOpen: Boolean;
+begin
+  result := (FStompCli <> nil);
+end;
+
+
+function TStompTransportImpl.Peek: Boolean;
+begin
+  result := FALSE;  // output only
+end;
+
+
+procedure TStompTransportImpl.Open;
+begin
+  if FStompCli <> nil
+  then raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen, 'already open')
+  else FStompCli := StompUtils.NewStomp( FServer);
+end;
+
+
+procedure TStompTransportImpl.Close;
+begin
+  FStompCli := nil;
+  FData.Clear;
+end;
+
+
+procedure TStompTransportImpl.Flush;
+begin
+  if FStompCli = nil
+  then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'not open');
+
+  FStompCli.Send( FOutQueue, FData.DataString);
+  FData.Clear;
+end;
+
+
+//--- TStompServerTransportImpl --------------------------------------------
+
+
+constructor TStompServerTransportImpl.Create( const aServerAndPort, aInQueue : string);
+begin
+  inherited Create;
+  FServer  := aServerAndPort;
+  FInQueue := aInQueue;
+end;
+
+
+destructor TStompServerTransportImpl.Destroy;
+begin
+  try
+    Close;
+  finally
+    inherited Destroy;
+  end;
+end;
+
+
+procedure TStompServerTransportImpl.Listen;
+begin
+  FClient := StompUtils.NewStomp(FServer);
+  FClient.Subscribe( FInQueue);
+end;
+
+
+procedure TStompServerTransportImpl.Close;
+begin
+  if FClient <> nil then begin
+    FClient.Unsubscribe( FInQueue);
+    FClient := nil;
+  end;
+end;
+
+
+function TStompServerTransportImpl.Accept( const fnAccepting: TProc): ITransport;
+var frame   : IStompFrame;
+    adapter : IThriftStream;
+    stream  : TStringStream;
+begin
+  if FClient = nil
+  then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+                                         'Not connected.');
+
+  if Assigned(fnAccepting)
+  then fnAccepting();
+
+  try
+    frame := FClient.Receive(MAXINT);
+    if frame = nil then Exit(nil);
+
+    stream  := TStringStream.Create( frame.GetBody);
+    adapter := TThriftStreamAdapterDelphi.Create( stream, TRUE);
+    result  := TStreamTransportImpl.Create( adapter, nil);
+
+  except
+    on E: Exception
+    do raise TTransportException.Create( E.ToString );
+  end;
+end;
+
+
+end.
+